Source code for faust.transport.producer

"""Producer.

The Producer is responsible for:

   - Holds reference to the transport that created it
   - ... and the app via ``self.transport.app``.
   - Sending messages.
"""
import asyncio
from typing import Any, Awaitable, Mapping, Optional
from mode import Seconds, Service
from faust.types.tuples import RecordMetadata, TP
from faust.types.transports import ProducerT, TransportT

__all__ = ['Producer']


[docs]class Producer(Service, ProducerT): """Base Producer.""" def __init__(self, transport: TransportT, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None: self.transport = transport conf = self.transport.app.conf self.linger_ms = conf.producer_linger_ms self.max_batch_size = conf.producer_max_batch_size self.acks = conf.producer_acks self.max_request_size = conf.producer_max_request_size self.compression_type = conf.producer_compression_type super().__init__(loop=loop or self.transport.loop, **kwargs)
[docs] async def send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int]) -> Awaitable[RecordMetadata]: raise NotImplementedError()
[docs] async def send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int]) -> RecordMetadata: raise NotImplementedError()
[docs] async def create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Seconds = 1000.0, retention: Seconds = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) -> None: raise NotImplementedError()
[docs] def key_partition(self, topic: str, key: bytes) -> TP: raise NotImplementedError()