faust.channels

Channel.

A channel is used to send values to streams.

The stream will iterate over incoming events in the channel.

class faust.channels.Channel(app: faust.types.app.AppT, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Create new channel.

Parameters
  • app (AppT[]) – The app that created this channel (app.channel())

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for keys in this channel.

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for values in this channel.

  • maxsize (Optional[int]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • loop (Optional[AbstractEventLoop]) – The asyncio event loop to use.

coroutine deliver(self, message: faust.types.tuples.Message) → None[source]
Return type

None

queue
Return type

ThrowableQueue

clone(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

stream(**kwargs) → faust.types.streams.StreamT[source]

Create stream reading from this channel.

Return type

StreamT[+T_co]

get_topic_name() → str[source]
Return type

str

as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → faust.types.tuples.FutureMessage[source]
Return type

FutureMessage[]

prepare_headers(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]][source]
Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes]]

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]
Return type

Any

empty() → bool[source]
Return type

bool

on_stop_iteration() → None[source]
Return type

None

derive(**kwargs) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

coroutine declare(self) → None[source]
Return type

None

coroutine decode(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]
Return type

EventT[]

coroutine get(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]
Return type

Any

maybe_declare[source]
coroutine on_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_key_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine on_value_decode_error(self, exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

coroutine publish_message(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

coroutine put(self, value: Any) → None[source]
Return type

None

coroutine send(self, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to channel.

Return type

Awaitable[RecordMetadata]

coroutine throw(self, exc: BaseException) → None[source]
Return type

None

subscriber_count
Return type

int

label
Return type

str