Source code for faust.transport.base
"""Base message transport implementation.
The Transport is responsible for:
- Holds reference to the app that created it.
- Creates new consumers/producers.
To see a reference transport implementation go to:
:file:`faust/transport/drivers/aiokafka.py`
"""
import asyncio
from typing import Any, ClassVar, List, Type
from mode.services import ServiceT
from yarl import URL
from faust.types import AppT
from faust.types.transports import (
ConductorT,
ConsumerCallback,
ConsumerT,
ProducerT,
TransactionManagerT,
TransportT,
)
from .conductor import Conductor
from .consumer import Consumer, Fetcher, TransactionManager
from .producer import Producer
__all__ = ['Conductor', 'Consumer', 'Fetcher', 'Producer', 'Transport']
[docs]class Transport(TransportT):
"""Message transport implementation."""
#: Consumer subclass used for this transport.
Consumer: ClassVar[Type[ConsumerT]]
Consumer = Consumer
#: Producer subclass used for this transport.
Producer: ClassVar[Type[ProducerT]]
Producer = Producer
TransactionManager: ClassVar[Type[TransactionManagerT]]
TransactionManager = TransactionManager
Conductor: ClassVar[Type[ConductorT]]
Conductor = Conductor
#: Service that fetches messages from the broker.
Fetcher: ClassVar[Type[ServiceT]] = Fetcher
driver_version: str
def __init__(self,
url: List[URL],
app: AppT,
loop: asyncio.AbstractEventLoop = None) -> None:
self.url = url
self.app = app
self.loop = loop or asyncio.get_event_loop()
[docs] def create_consumer(self, callback: ConsumerCallback,
**kwargs: Any) -> ConsumerT:
"""Create new consumer."""
return self.Consumer(self, callback=callback,
loop=self.loop,
**kwargs)
[docs] def create_producer(self, **kwargs: Any) -> ProducerT:
"""Create new producer."""
return self.Producer(self, **kwargs)
[docs] def create_transaction_manager(self,
consumer: ConsumerT,
producer: ProducerT,
**kwargs: Any) -> TransactionManagerT:
"""Create new transaction manager."""
return self.TransactionManager(
self,
consumer=consumer,
producer=producer,
**kwargs,
)
[docs] def create_conductor(self, **kwargs: Any) -> ConductorT:
"""Create new consumer conductor."""
return self.Conductor(app=self.app, loop=self.loop, **kwargs)