faust.topics¶
Topic - Named channel using Kafka.
-
class
faust.topics.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Define new topic description.
Parameters: - app (
AppT[]) – App instance used to create this topic description. - topics (
Optional[Sequence[str]]) – List of topic names. - partitions (
Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, andautoCreateTopicsis enabled on the Kafka Server, the number of partitions used will be specified by the server configuration. - retention (
Union[timedelta,float,str,None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server. - pattern (
Union[str,Pattern[AnyStr],None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern. - key_type (
Union[Type[ModelT],Type[bytes],Type[str],None]) – How to deserialize keys for messages in this topic. Can be afaust.Modeltype,str,bytes, orNonefor “autodetect” - value_type (
Union[Type[ModelT],Type[bytes],Type[str],None]) – How to deserialize values for messages in this topic. Can be afaust.Modeltype,str,bytes, orNonefor “autodetect” - active_partitions (
Optional[Set[TP]]) – Set offaust.types.tuples.TPthat this topic should be restricted to.
Raises: TypeError– if both topics and pattern is provided.-
pattern¶
-
derive(**kwargs) → faust.types.channels.ChannelT[source]¶ Create new
Topicderived from this topic.Configuration will be copied from this topic, but any parameter overriden as a keyword argument.
See also
derive_topic(): for a list of supported keyword arguments.Return type: ChannelT[]
-
derive_topic(*, topics: Sequence[str] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]¶ Return type: TopicT[]
-
coroutine
decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Return type: EventT[]
-
coroutine
publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable[RecordMetadata]
-
prepare_key(key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], key_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]¶ Return type: Any
-
prepare_value(value: Union[bytes, faust.types.core.ModelT, typing.Any], value_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]¶ Return type: Any
-
partitions¶
- app (