faust.topics

Topic - Named channel using Kafka.

class faust.topics.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Define new topic description.

Parameters
  • app (AppT[]) – App instance used to create this topic description.

  • topics (Optional[Sequence[str]]) – List of topic names.

  • partitions (Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (Union[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (Union[str, Pattern[AnyStr], None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect”

  • active_partitions (Optional[Set[TP]]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises

TypeError – if both topics and pattern is provided.

send_soon(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → faust.types.tuples.FutureMessage[source]

Produce message by adding to buffer.

Notes

This method can be used by non-async def functions to produce messages.

Return type

FutureMessage[]

property pattern

Regular expression used by this topic (if any). :rtype: Optional[Pattern[AnyStr]]

property partitions

Return the number of configured partitions for this topic.

Notes

This is only active for internal topics, fully owned and managed by Faust itself.

We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (num.partitions).

Always make sure your topics have the correct number of partitions. :rtype: Optional[int]

derive(**kwargs) → faust.types.channels.ChannelT[source]

Create topic derived from the configuration of this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type

ChannelT[]

derive_topic(*, topics: Sequence[str] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]

Create new topic with configuration derived from this topic.

Return type

TopicT[]

get_topic_name() → str[source]

Return the main topic name of this topic description.

As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.

Raises
  • TypeError – if configured with a regular expression pattern.

  • ValueError – if configured with multiple topic names.

  • TypeError – if not configured with any names or patterns.

Return type

str

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]

Serialize key to format suitable for transport.

Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]

Serialize value to format suitable for transport.

Return type

Any

maybe_declare[source]

Declare/create this topic, only if it does not exist. :rtype: None