Source code for faust.transport.base

"""Base message transport implementation.

The Transport is responsible for:

- Holds reference to the app that created it.
- Creates new consumers/producers.

To see a reference transport implementation go to:
faust/transport/drivers/aiokafka.py
"""
import asyncio
import typing
from typing import (
    Any,
    ClassVar,
    Type,
    Union,
)

from mode.services import ServiceT
from yarl import URL

from faust.types import AppT
from faust.types.transports import (
    ConductorT,
    ConsumerCallback,
    ConsumerT,
    ProducerT,
    TransportT,
)

from .conductor import Conductor
from .consumer import Consumer, Fetcher
from .producer import Producer

if typing.TYPE_CHECKING:  # pragma: no cover
    from faust.app import App
else:
    class App: ...  # noqa

__all__ = ['Conductor', 'Consumer', 'Fetcher', 'Producer', 'Transport']


[docs]class Transport(TransportT): """Message transport implementation.""" #: Consumer subclass used for this transport. Consumer: ClassVar[Type[ConsumerT]] Consumer = Consumer #: Producer subclass used for this transport. Producer: ClassVar[Type[ProducerT]] Producer = Producer Conductor: ClassVar[Type[ConductorT]] Conductor = Conductor #: Service that fetches messages from the broker. Fetcher: ClassVar[Type[ServiceT]] = Fetcher driver_version: str def __init__(self, url: Union[str, URL], app: AppT, loop: asyncio.AbstractEventLoop = None) -> None: self.url = URL(url) self.app = app self.loop = loop or asyncio.get_event_loop()
[docs] def create_consumer(self, callback: ConsumerCallback, **kwargs: Any) -> ConsumerT: return self.Consumer(self, callback=callback, loop=self.loop, **kwargs)
[docs] def create_producer(self, **kwargs: Any) -> ProducerT: return self.Producer(self, **kwargs)
[docs] def create_conductor(self, **kwargs: Any) -> ConductorT: return self.Conductor(app=self.app, loop=self.loop, **kwargs)