faust.topics

Topic - Named channel using Kafka.

class faust.topics.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Define new topic description.

Parameters
  • app (AppT[]) – App instance used to create this topic description.

  • topics (Optional[Sequence[str]]) – List of topic names.

  • partitions (Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (Union[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (Union[str, Pattern[AnyStr], None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • active_partitions (Optional[Set[TP]]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises

TypeError – if both topics and pattern is provided.

pattern
Return type

Optional[Pattern[AnyStr]]

derive(**kwargs) → faust.types.channels.ChannelT[source]

Create new Topic derived from this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type

ChannelT[]

derive_topic(*, topics: Sequence[str] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]
Return type

TopicT[]

get_topic_name() → str[source]
Return type

str

coroutine declare(self) → None[source]
Return type

None

coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]
Return type

EventT[]

maybe_declare[source]
coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine put(self, event: faust.types.events.EventT) → None[source]
Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic.

Return type

Awaitable[RecordMetadata]

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

on_stop_iteration() → None[source]
Return type

None

partitions
Return type

Optional[int]