Channels & Topics - Data Sources

Basics

Faust agents iterate over streams, and streams iterate over channels.

A channel is a construct used to send and receive messages, then we have the “topic”, which is a named-channel backed by a Kafka topic.

Streams read from channels (either a local-channel or a topic).

Agent <–> Stream <–> Channel

Topics are named-channels backed by a transport (to use e.g. Kafka topics):

Agent <–> Stream <–> Topic <–> Transport <–> aiokafka

Faust defines these layers of abstraction so that agents can send and receive messages using more than one type of transport.

Topics are highly Kafka specific, while channels are not. That makes channels more natural to subclass should you require a different means of communication, for example using RabbitMQ (AMQP), Stomp, MQTT, NSQ, ZeroMQ, etc.

Channels

A channel is a buffer/queue used to send and receive messages. This buffer could exist in-memory in the local process only, or transmit serialized messages over the network.

You can create channels manually and read/write from them:

async def main():
    channel = app.channel()

    await channel.put(1)

    async for event in channel:
        print(event.value)
        # the channel is infinite so we break after first event
        break

Reference

Sending messages to channel

class faust.Channel[source]
coroutine send(self, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to channel.

Return type:Awaitable[RecordMetadata]
as_future_message(key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None) → faust.types.tuples.FutureMessage[source]
Return type:FutureMessage[]
coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type:Awaitable[RecordMetadata]

Declaring

Note

Some channels may require you to declare them on the server side before they’re used. Faust will create topics considered internal but will not create or modify “source topics” (i.e., exposed for use by other Kafka applications).

To define a topic as internal use app.topic('name', ..., internal=True).

class faust.Channel[source]
coroutine maybe_declare()[source]
coroutine declare(self) → None[source]
Return type:None

Topics

A topic is a named channel, backed by a Kafka topic. The name is used as the address of the channel, to share it between multiple processes and each process will receive a partition of the topic.