faust.channels
¶
Channel.
A channel is used to send values to streams.
The stream will iterate over incoming events in the channel.
-
class
faust.channels.
Channel
(app: faust.types.app.AppT, *, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Create new channel.
Parameters: - app (
AppT
[]) – The app that created this channel (app.channel()
) - key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for keys in this channel. - value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for values in this channel. - maxsize (
Optional
[int
]) – The maximum number of messages this channel can hold. If exceeded any newput
call will block until a message is removed from the channel. - loop (
Optional
[AbstractEventLoop
]) – The asyncio event loop to use.
-
queue
¶
-
clone
(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]¶ Return type: ChannelT
[]
-
stream
(**kwargs) → faust.types.streams.StreamT[source]¶ Create stream reading from this channel.
Return type: StreamT
[+T_co]
-
as_future_message
(key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None) → faust.types.tuples.FutureMessage[source]¶ Return type: FutureMessage
[]
-
prepare_key
(key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], key_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]¶ Return type: Any
-
prepare_value
(value: Union[bytes, faust.types.core.ModelT, typing.Any], value_serializer: Union[faust.types.codecs.CodecT, str, NoneType]) → Any[source]¶ Return type: Any
-
coroutine
decode
(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Return type: EventT
[]
-
coroutine
get
(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ Return type: Any
-
coroutine
on_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_key_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_value_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
publish_message
(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to channel.
Return type: Awaitable
[RecordMetadata
]
-
subscriber_count
¶
-
label
¶
- app (