faust.channels
¶
Channel.
A channel is used to send values to streams.
The stream will iterate over incoming events in the channel.
-
class
faust.channels.
Channel
(app: faust.types.app.AppT, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Create new channel.
- Parameters
app (
AppT
[]) – The app that created this channel (app.channel()
)schema (
Optional
[SchemaT
[~KT, ~VT]]) – Schema used for serialization/deserializationkey_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for keys in this channel. (overrides schema if one is defined)value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for values in this channel. (overrides schema if one is defined)maxsize (
Optional
[int
]) – The maximum number of messages this channel can hold. If exceeded any newput
call will block until a message is removed from the channel.is_iterator (
bool
) – When streams iterate over a channel they will callstream.clone(is_iterator=True)
so this attribute denotes that this channel instance is currently being iterated over.active_partitions (
Optional
[Set
[TP
]]) – Set of active topic partitions this channel instance is assigned to.loop (
Optional
[AbstractEventLoop
]) – Theasyncio
event loop to use.
-
property
queue
¶ Return the underlying queue/buffer backing this channel. :rtype:
ThrowableQueue
-
clone
(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]¶ Create clone of this channel.
- Parameters
is_iterator (
Optional
[bool
]) – Set to True if this is now a channel that is being iterated over.- Keyword Arguments
**kwargs – Any keyword arguments passed will override any of the arguments supported by
Channel.__init__
.- Return type
ChannelT
[]
-
clone_using_queue
(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]¶ Create clone of this channel using specific queue instance.
- Return type
ChannelT
[]
-
stream
(**kwargs: Any) → faust.types.streams.StreamT[source]¶ Create stream reading from this channel.
- Return type
StreamT
[+T_co]
-
get_topic_name
() → str[source]¶ Get the topic name, or raise if this is not a named channel.
- Return type
-
async
send
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to channel.
- Return type
-
send_soon
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ Produce message by adding to buffer.
This method is only supported by
Topic
.- Raises
NotImplementedError – always for in-memory channel.
- Return type
-
as_future_message
(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ Create promise that message will be transmitted.
- Return type
-
prepare_headers
(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]¶ Prepare
headers
passed before publishing.
-
async
publish_message
(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Publish message to channel.
This is the interface used by
topic.send()
, etc. to actually publish the message on the channel after being buffered up or similar.It takes a
FutureMessage
object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.- Return type
-
async
declare
() → None[source]¶ Declare/create this channel.
This is used to create this channel on a server, if that is required to operate it.
- Return type
None
-
prepare_key
(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶ Prepare key before it is sent to this channel.
Topic
uses this to implement serialization of keys sent to the channel.
-
prepare_value
(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶ Prepare value before it is sent to this channel.
Topic
uses this to implement serialization of values sent to the channel.
-
async
decode
(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Decode
Message
intoEvent
.- Return type
EventT
[]
-
async
deliver
(message: faust.types.tuples.Message) → None[source]¶ Deliver message to queue from consumer.
This is called by the consumer to deliver the message to the channel.
- Return type
None
-
async
get
(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ Get the next
Event
received on this channel.- Return type
-
async
on_key_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Unable to decode the key of an item in the queue.
See also
- Return type
None
-
async
on_value_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Unable to decode the value of an item in the queue.
See also
- Return type
None
-
async
on_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Signal that there was an error reading an event in the queue.
When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.
We will log the exception, but you can also override this to perform additional actions.
- Admonition: Kafka
In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.
For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.
- Return type
None
-
on_stop_iteration
() → None[source]¶ Signal that iteration over this channel was stopped.
Tip
Remember to call
super
when overriding this method.- Return type
None
-
derive
(**kwargs: Any) → faust.types.channels.ChannelT[source]¶ Derive new channel from this channel, using new configuration.
See
faust.Topic.derive
.For local channels this will simply return the same channel.
- Return type
ChannelT
[]