faust.streams
¶
Streams.
-
faust.streams.
current_event
() → Union[faust.types.events.EventT, NoneType][source]¶ Return the event currently being processed, or None.
Return type: Optional
[EventT
[]]
-
class
faust.streams.
Stream
(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ A stream: async iterator processing events in channels/topics.
-
logger
= <Logger faust.streams (WARNING)>¶
-
get_active_stream
() → faust.types.streams.StreamT[source]¶ Return the currently active stream.
A stream can be derived using
Stream.group_by
etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:>>> @app.agent() ... async def agent(a): .. a = a ... b = a.group_by(Withdrawal.account_id) ... c = b.through('backup_topic') ... async for value in c: ... ...
The return value of
a.get_active_stream()
would bec
.Notes
The chain of streams that leads to the active stream is decided by the
_next
attribute. To get to the active stream we just traverse this linked-list:>>> def get_active_stream(self): ... node = self ... while node._next: ... node = node._next
Return type: StreamT
[+T_co]
-
add_processor
(processor: Callable[T]) → None[source]¶ Add processor callback executed whenever a new event is received.
Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.
For example a processor handling a stream of numbers may modify the value:
def double(value: int) -> int: return value * 2 stream.add_processor(double)
Return type: None
-
info
() → Mapping[str, Any][source]¶ Return stream settings as a dictionary.
Return type: Mapping
[str
,Any
]
-
clone
(**kwargs) → faust.types.streams.StreamT[source]¶ Create a clone of this stream.
Notes
If the cloned stream is supposed to “supercede” this stream, like in
group_by
/through
/etc., you should use_chain()
instead so stream._next = cloned_stream is set andget_active_stream()
returns the cloned stream.Return type: StreamT
[+T_co]
-
events
() → AsyncIterable[faust.types.events.EventT][source]¶ Iterate over the stream as events exclusively.
This means the stream must be iterating over a channel, or at least an iterable of event objects.
Return type: AsyncIterable
[EventT
[]]
-
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ Enumerate values received on this stream.
Unlike Python’s built-in
enumerate
, this works with async generators.Return type: AsyncIterable
[Tuple
[int
, +T_co]]
-
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to in this stream to channel.
Send messages received on this stream to another channel, and return a new stream that consumes from that channel.
Notes
The messages are forwarded after any processors have been applied.
Example
topic = app.topic('foo') @app.agent(topic) async def mytask(stream): async for value in stream.through(app.topic('bar')): # value was first received in topic 'foo', # then forwarded and consumed from topic 'bar' print(value)
Return type: StreamT
[+T_co]
-
echo
(*channels) → faust.types.streams.StreamT[source]¶ Forward values to one or more channels.
Unlike
through()
, we don’t consume from these channels.Return type: StreamT
[+T_co]
-
group_by
(key: Union[faust.types.models.FieldDescriptorT, typing.Callable[[~T], typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]¶ Create new stream that repartitions the stream using a new key.
Parameters: - key (
Union
[FieldDescriptorT
,Callable
[[~T],Union
[bytes
,ModelT
,Any
,None
]]]) –The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.
- Note: The
name
argument must be provided if the key - argument is a callable.
- Note: The
- name (
Optional
[str
]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.
Examples
Using a field descriptor to use a field in the event as the new key:
s = withdrawals_topic.stream() # values in this stream are of type Withdrawal async for event in s.group_by(Withdrawal.account_id): ...
Using an async callable to extract a new key:
s = withdrawals_topic.stream() async def get_key(withdrawal): return await aiohttp.get( f'http://e.com/resolve_account/{withdrawal.account_id}' async for event in s.group_by(get_key): ...
Using a regular callable to extract a new key:
s = withdrawals_topic.stream() def get_key(withdrawal): return withdrawal.account_id.upper() async for event in s.group_by(get_key): ...
Return type: StreamT
[+T_co]- key (
-
derive_topic
(name: str, *, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ Create Topic description derived from the K/V type of this stream.
Parameters: - name (
str
) – Topic name. - key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific key type to use for this topic. If not set, the key type of this stream will be used. - value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.
Raises: ValueError
– if the stream channel is not a topic.Return type: TopicT
[]- name (
-
coroutine
ack
(self, event: faust.types.events.EventT) → bool[source]¶ Ack event.
This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.
Parameters: event ( EventT
[]) – Event to ack.Return type: bool
-
items
() → AsyncIterator[Tuple[Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], T_co]][source]¶ Iterate over the stream as
key, value
pairs.Examples
@app.agent(topic) async def mytask(stream): async for key, value in stream.items(): print(key, value)
Return type: AsyncIterator
[Tuple
[Union
[bytes
,ModelT
,Any
,None
], +T_co]]
-
coroutine
on_start
(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
remove_from_stream
(self, stream: faust.types.streams.StreamT) → None[source]¶ Return type: None
-
coroutine
send
(self, value: T_contra) → None[source]¶ Send value into stream locally (bypasses topic).
Return type: None
-
take
(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]¶ Buffer n values at a time and yield a list of buffered values.
Parameters: within ( Union
[timedelta
,float
,str
]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).Return type: AsyncIterable
[Sequence
[+T_co]]
-