Source code for faust.transport.producer

"""Producer.

The Producer is responsible for:

   - Holds reference to the transport that created it
   - ... and the app via ``self.transport.app``.
   - Sending messages.
"""
import asyncio
from typing import Any, Awaitable, Mapping, Optional
from mode import Seconds, Service
from faust.types import AppT, HeadersArg
from faust.types.tuples import RecordMetadata, TP
from faust.types.transports import ProducerT, TransportT

__all__ = ['Producer']


[docs]class Producer(Service, ProducerT): """Base Producer.""" app: AppT _api_version: str def __init__(self, transport: TransportT, loop: asyncio.AbstractEventLoop = None, **kwargs: Any) -> None: self.transport = transport self.app = self.transport.app conf = self.transport.app.conf self.client_id = conf.broker_client_id self.linger_ms = conf.producer_linger_ms self.max_batch_size = conf.producer_max_batch_size self.acks = conf.producer_acks self.max_request_size = conf.producer_max_request_size self.compression_type = conf.producer_compression_type self.request_timeout = conf.producer_request_timeout self.ssl_context = conf.ssl_context self.credentials = conf.broker_credentials self.partitioner = conf.producer_partitioner api_version = self._api_version = conf.producer_api_version assert api_version is not None super().__init__(loop=loop or self.transport.loop, **kwargs)
[docs] async def send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[HeadersArg], *, transactional_id: str = None) -> Awaitable[RecordMetadata]: raise NotImplementedError()
[docs] async def send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Optional[HeadersArg], *, transactional_id: str = None) -> RecordMetadata: raise NotImplementedError()
[docs] async def flush(self) -> None: ...
[docs] async def create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Seconds = 1000.0, retention: Seconds = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) -> None: raise NotImplementedError()
[docs] def key_partition(self, topic: str, key: bytes) -> TP: raise NotImplementedError()
[docs] async def begin_transaction(self, transactional_id: str) -> None: raise NotImplementedError()
[docs] async def commit_transaction(self, transactional_id: str) -> None: raise NotImplementedError()
[docs] async def abort_transaction(self, transactional_id: str) -> None: raise NotImplementedError()
[docs] async def stop_transaction(self, transactional_id: str) -> None: raise NotImplementedError()
[docs] async def maybe_begin_transaction(self, transactional_id: str) -> None: raise NotImplementedError()
[docs] async def commit_transactions( self, tid_to_offset_map: Mapping[str, Mapping[TP, int]], group_id: str, start_new_transaction: bool = True) -> None: raise NotImplementedError()
[docs] def supports_headers(self) -> bool: return False