faust.channels

Channel.

A channel is used to send values to streams.

The stream will iterate over incoming events in the channel.

class faust.channels.Channel(app: faust.types.app.AppT, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Create new channel.

Parameters
  • app (AppT[]) – The app that created this channel (app.channel())

  • schema (Optional[SchemaT[~KT, ~VT]]) – Schema used for serialization/deserialization

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for keys in this channel. (overrides schema if one is defined)

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for values in this channel. (overrides schema if one is defined)

  • maxsize (Optional[int]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • is_iterator (bool) – When streams iterate over a channel they will call stream.clone(is_iterator=True) so this attribute denotes that this channel instance is currently being iterated over.

  • active_partitions (Optional[Set[TP]]) – Set of active topic partitions this channel instance is assigned to.

  • loop (Optional[AbstractEventLoop]) – The asyncio event loop to use.

property queue

Return the underlying queue/buffer backing this channel. :rtype: ThrowableQueue

clone(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]

Create clone of this channel.

Parameters

is_iterator (Optional[bool]) – Set to True if this is now a channel that is being iterated over.

Keyword Arguments

**kwargs – Any keyword arguments passed will override any of the arguments supported by Channel.__init__.

Return type

ChannelT[]

clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]

Create clone of this channel using specific queue instance.

Return type

ChannelT[]

stream(**kwargs: Any) → faust.types.streams.StreamT[source]

Create stream reading from this channel.

Return type

StreamT[+T_co]

get_topic_name() → str[source]

Get the topic name, or raise if this is not a named channel.

Return type

str

async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to channel.

Return type

Awaitable[RecordMetadata]

send_soon(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]

Produce message by adding to buffer.

This method is only supported by Topic.

Raises

NotImplementedError – always for in-memory channel.

Return type

FutureMessage[]

as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]

Create promise that message will be transmitted.

Return type

FutureMessage[]

prepare_headers(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]

Prepare headers passed before publishing.

Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]

async publish_message(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]

Publish message to channel.

This is the interface used by topic.send(), etc. to actually publish the message on the channel after being buffered up or similar.

It takes a FutureMessage object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.

Return type

Awaitable[RecordMetadata]

maybe_declare[source]

Declare/create this channel, but only if it doesn’t exist. :rtype: None

async declare() → None[source]

Declare/create this channel.

This is used to create this channel on a server, if that is required to operate it.

Return type

None

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]

Prepare key before it is sent to this channel.

Topic uses this to implement serialization of keys sent to the channel.

Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]

Prepare value before it is sent to this channel.

Topic uses this to implement serialization of values sent to the channel.

Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

async decode(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]

Decode Message into Event.

Return type

EventT[]

async deliver(message: faust.types.tuples.Message) → None[source]

Deliver message to queue from consumer.

This is called by the consumer to deliver the message to the channel.

Return type

None

async put(value: Any) → None[source]

Put event onto this channel.

Return type

None

async get(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]

Get the next Event received on this channel.

Return type

Any

empty() → bool[source]

Return True if the queue is empty.

Return type

bool

async on_key_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Unable to decode the key of an item in the queue.

Return type

None

async on_value_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Unable to decode the value of an item in the queue.

Return type

None

async on_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Signal that there was an error reading an event in the queue.

When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.

We will log the exception, but you can also override this to perform additional actions.

Admonition: Kafka

In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.

For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.

Return type

None

on_stop_iteration() → None[source]

Signal that iteration over this channel was stopped.

Tip

Remember to call super when overriding this method.

Return type

None

derive(**kwargs: Any) → faust.types.channels.ChannelT[source]

Derive new channel from this channel, using new configuration.

See faust.Topic.derive.

For local channels this will simply return the same channel.

Return type

ChannelT[]

async throw(exc: BaseException) → None[source]

Throw exception to be received by channel subscribers.

Tip

When you find yourself having to call this from a regular, non-async def function, you can use _throw() instead.

Return type

None

property subscriber_count

Return number of active subscribers to local channel. :rtype: int

property label

Short textual description of channel. :rtype: str