faust.topics

Topic - Named channel using Kafka.

class faust.topics.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Define new topic description.

Parameters:
  • app (AppT[]) – App instance used to create this topic description.
  • topics (Optional[Sequence[str]]) – List of topic names.
  • partitions (Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.
  • retention (Union[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.
  • pattern (Union[str, Pattern[AnyStr], None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.
  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”
  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”
  • active_partitions (Optional[Set[TP]]) – Set of faust.types.tuples.TP that this topic should be restricted to.
Raises:

TypeError – if both topics and pattern is provided.

pattern
derive(**kwargs) → faust.types.channels.ChannelT[source]

Create new Topic derived from this topic.

Configuration will be copied from this topic, but any parameter overriden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type:ChannelT[]
derive_topic(*, topics: Sequence[str] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]
Return type:TopicT[]
get_topic_name() → str[source]
Return type:str
coroutine declare(self) → None[source]
Return type:None
coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]
Return type:EventT[]
maybe_declare[source]
coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type:Awaitable[RecordMetadata]
coroutine put(self, event: faust.types.events.EventT) → None[source]
Return type:None
prepare_key(key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], key_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]
Return type:Any
prepare_value(value: Union[bytes, faust.types.core.ModelT, typing.Any], value_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]
Return type:Any
on_stop_iteration() → None[source]
Return type:None
partitions