faust.streams
¶
Streams.
-
faust.streams.
current_event
() → Optional[faust.types.events.EventT][source]¶ Return the event currently being processed, or None.
-
class
faust.streams.
Stream
(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ A stream: async iterator processing events in channels/topics.
-
logger
= <Logger faust.streams (WARNING)>¶
-
mundane_level
= 'debug'¶
-
get_active_stream
() → faust.types.streams.StreamT[source]¶ Return the currently active stream.
A stream can be derived using
Stream.group_by
etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:>>> @app.agent() ... async def agent(a): .. a = a ... b = a.group_by(Withdrawal.account_id) ... c = b.through('backup_topic') ... async for value in c: ... ...
The return value of
a.get_active_stream()
would bec
.Notes
The chain of streams that leads to the active stream is decided by the
_next
attribute. To get to the active stream we just traverse this linked-list:>>> def get_active_stream(self): ... node = self ... while node._next: ... node = node._next
- Return type
StreamT
[+T_co]
-
get_root_stream
() → faust.types.streams.StreamT[source]¶ Get the root stream that this stream was derived from.
- Return type
StreamT
[+T_co]
-
add_processor
(processor: Callable[T]) → None[source]¶ Add processor callback executed whenever a new event is received.
Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.
For example a processor handling a stream of numbers may modify the value:
def double(value: int) -> int: return value * 2 stream.add_processor(double)
- Return type
None
-
clone
(**kwargs: Any) → faust.types.streams.StreamT[source]¶ Create a clone of this stream.
Notes
If the cloned stream is supposed to supersede this stream, like in
group_by
/through
/etc., you should use_chain()
instead so stream._next = cloned_stream is set andget_active_stream()
returns the cloned stream.- Return type
StreamT
[+T_co]
-
noack
() → faust.types.streams.StreamT[source]¶ Create new stream where acks are manual.
- Return type
StreamT
[+T_co]
-
items
() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]¶ Iterate over the stream as
key, value
pairs.Examples
@app.agent(topic) async def mytask(stream): async for key, value in stream.items(): print(key, value)
- Return type
AsyncIterator
[Tuple
[Union
[bytes
,_ModelT
,Any
,None
], +T_co]]
-
events
() → AsyncIterable[faust.types.events.EventT][source]¶ Iterate over the stream as events exclusively.
This means the stream must be iterating over a channel, or at least an iterable of event objects.
- Return type
-
take
(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]¶ Buffer n values at a time and yield a list of buffered values.
- Parameters
within (
Union
[timedelta
,float
,str
]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).- Return type
AsyncIterable
[Sequence
[+T_co]]
-
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ Enumerate values received on this stream.
Unlike Python’s built-in
enumerate
, this works with async generators.- Return type
AsyncIterable
[Tuple
[int
, +T_co]]
-
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to in this stream to channel.
Send messages received on this stream to another channel, and return a new stream that consumes from that channel.
Notes
The messages are forwarded after any processors have been applied.
Example
topic = app.topic('foo') @app.agent(topic) async def mytask(stream): async for value in stream.through(app.topic('bar')): # value was first received in topic 'foo', # then forwarded and consumed from topic 'bar' print(value)
- Return type
StreamT
[+T_co]
-
echo
(*channels: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to one or more channels.
Unlike
through()
, we don’t consume from these channels.- Return type
StreamT
[+T_co]
-
group_by
(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]¶ Create new stream that repartitions the stream using a new key.
- Parameters
key (
Union
[FieldDescriptorT
[~T],Callable
[[~T],Union
[bytes
,_ModelT
,Any
,None
]]]) –The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.
- Note: The
name
argument must be provided if the key argument is a callable.
- Note: The
name (
Optional
[str
]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.
Examples
Using a field descriptor to use a field in the event as the new key:
s = withdrawals_topic.stream() # values in this stream are of type Withdrawal async for event in s.group_by(Withdrawal.account_id): ...
Using an async callable to extract a new key:
s = withdrawals_topic.stream() async def get_key(withdrawal): return await aiohttp.get( f'http://e.com/resolve_account/{withdrawal.account_id}') async for event in s.group_by(get_key): ...
Using a regular callable to extract a new key:
s = withdrawals_topic.stream() def get_key(withdrawal): return withdrawal.account_id.upper() async for event in s.group_by(get_key): ...
- Return type
StreamT
[+T_co]
-
filter
(fun: Callable[T]) → faust.types.streams.StreamT[source]¶ Filter values from stream using callback.
The callback may be a traditional function, lambda function, or an async def function.
This method is useful for filtering events before repartitioning a stream.
Examples
>>> async for v in stream.filter(lambda: v > 1000).group_by(...): ... # do something
- Return type
StreamT
[+T_co]
-
derive_topic
(name: str, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ Create Topic description derived from the K/V type of this stream.
- Parameters
name (
str
) – Topic name.key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.
- Raises
ValueError – if the stream channel is not a topic.
- Return type
TopicT
[]
-
async
throw
(exc: BaseException) → None[source]¶ Send exception to stream iteration.
- Return type
None
-
combine
(*nodes: faust.types.streams.JoinableT, **kwargs: Any) → faust.types.streams.StreamT[source]¶ Combine streams and tables into joined stream.
- Return type
StreamT
[+T_co]
-
contribute_to_stream
(active: faust.types.streams.StreamT) → None[source]¶ Add stream as node in joined stream.
- Return type
None
-
async
remove_from_stream
(stream: faust.types.streams.StreamT) → None[source]¶ Remove as node in a joined stream.
- Return type
None
-
join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined.
- Return type
StreamT
[+T_co]
-
left_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by LEFT JOIN.
- Return type
StreamT
[+T_co]
-
inner_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by INNER JOIN.
- Return type
StreamT
[+T_co]
-
outer_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by OUTER JOIN.
- Return type
StreamT
[+T_co]
-
async
on_merge
(value: T = None) → Optional[T][source]¶ Signal called when an event is to be joined.
- Return type
Optional
[~T]
-
async
send
(value: T_contra) → None[source]¶ Send value into stream locally (bypasses topic).
- Return type
None
-