faust.streams

Streams.

faust.streams.current_event() → Optional[faust.types.events.EventT][source]

Return the event currently being processed, or None.

Return type

Optional[EventT[]]

class faust.streams.Stream(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]

A stream: async iterator processing events in channels/topics.

logger = <Logger faust.streams (WARNING)>
mundane_level = 'debug'
get_active_stream() → faust.types.streams.StreamT[source]

Return the currently active stream.

A stream can be derived using Stream.group_by etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:

>>> @app.agent()
... async def agent(a):
..      a = a
...     b = a.group_by(Withdrawal.account_id)
...     c = b.through('backup_topic')
...     async for value in c:
...         ...

The return value of a.get_active_stream() would be c.

Notes

The chain of streams that leads to the active stream is decided by the _next attribute. To get to the active stream we just traverse this linked-list:

>>> def get_active_stream(self):
...     node = self
...     while node._next:
...         node = node._next
Return type

StreamT[+T_co]

get_root_stream() → faust.types.streams.StreamT[source]

Get the root stream that this stream was derived from.

Return type

StreamT[+T_co]

add_processor(processor: Callable[T]) → None[source]

Add processor callback executed whenever a new event is received.

Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.

For example a processor handling a stream of numbers may modify the value:

def double(value: int) -> int:
    return value * 2

stream.add_processor(double)
Return type

None

info() → Mapping[str, Any][source]

Return stream settings as a dictionary.

Return type

Mapping[str, Any]

clone(**kwargs) → faust.types.streams.StreamT[source]

Create a clone of this stream.

Notes

If the cloned stream is supposed to supersede this stream, like in group_by/through/etc., you should use _chain() instead so stream._next = cloned_stream is set and get_active_stream() returns the cloned stream.

Return type

StreamT[+T_co]

noack() → faust.types.streams.StreamT[source]

Create new stream where acks are manual.

Return type

StreamT[+T_co]

items() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]

Iterate over the stream as key, value pairs.

Examples

@app.agent(topic)
async def mytask(stream):
    async for key, value in stream.items():
        print(key, value)
Return type

AsyncIterator[Tuple[Union[bytes, _ModelT, Any, None], +T_co]]

events() → AsyncIterable[faust.types.events.EventT][source]

Iterate over the stream as events exclusively.

This means the stream must be iterating over a channel, or at least an iterable of event objects.

Return type

AsyncIterable[EventT[]]

take(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters

within (Union[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type

AsyncIterable[Sequence[+T_co]]

enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]

Enumerate values received on this stream.

Unlike Python’s built-in enumerate, this works with async generators.

Return type

AsyncIterable[Tuple[int, +T_co]]

through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]

Forward values to in this stream to channel.

Send messages received on this stream to another channel, and return a new stream that consumes from that channel.

Notes

The messages are forwarded after any processors have been applied.

Example

topic = app.topic('foo')

@app.agent(topic)
async def mytask(stream):
    async for value in stream.through(app.topic('bar')):
        # value was first received in topic 'foo',
        # then forwarded and consumed from topic 'bar'
        print(value)
Return type

StreamT[+T_co]

echo(*channels) → faust.types.streams.StreamT[source]

Forward values to one or more channels.

Unlike through(), we don’t consume from these channels.

Return type

StreamT[+T_co]

group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]

Create new stream that repartitions the stream using a new key.

Parameters
  • key (Union[FieldDescriptorT[~T], Callable[[~T], Union[bytes, _ModelT, Any, None]]]) –

    The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.

    Note: The name argument must be provided if the key

    argument is a callable.

  • name (Optional[str]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.

Examples

Using a field descriptor to use a field in the event as the new key:

s = withdrawals_topic.stream()
# values in this stream are of type Withdrawal
async for event in s.group_by(Withdrawal.account_id):
    ...

Using an async callable to extract a new key:

s = withdrawals_topic.stream()

async def get_key(withdrawal):
    return await aiohttp.get(
        f'http://e.com/resolve_account/{withdrawal.account_id}')

async for event in s.group_by(get_key):
    ...

Using a regular callable to extract a new key:

s = withdrawals_topic.stream()

def get_key(withdrawal):
    return withdrawal.account_id.upper()

async for event in s.group_by(get_key):
    ...
Return type

StreamT[+T_co]

filter(fun: Callable[T]) → faust.types.streams.StreamT[source]

Filter values from stream using callback.

The callback may be a traditional function, lambda function, or an async def function.

This method is useful for filtering events before repartitioning a stream.

Examples

>>> async for v in stream.filter(lambda: v > 1000).group_by(...):
...     # do something
Return type

StreamT[+T_co]

derive_topic(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]

Create Topic description derived from the K/V type of this stream.

Parameters
  • name (str) – Topic name.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.

Raises

ValueError – if the stream channel is not a topic.

Return type

TopicT[]

combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]

Combine streams and tables into joined stream.

Return type

StreamT[+T_co]

contribute_to_stream(active: faust.types.streams.StreamT) → None[source]

Add stream as node in joined stream.

Return type

None

join(*fields) → faust.types.streams.StreamT[source]

Create stream where events are joined.

Return type

StreamT[+T_co]

left_join(*fields) → faust.types.streams.StreamT[source]

Create stream where events are joined by LEFT JOIN.

Return type

StreamT[+T_co]

inner_join(*fields) → faust.types.streams.StreamT[source]

Create stream where events are joined by INNER JOIN.

Return type

StreamT[+T_co]

outer_join(*fields) → faust.types.streams.StreamT[source]

Create stream where events are joined by OUTER JOIN.

Return type

StreamT[+T_co]

property label

Return description of stream, used in graphs and logs. :rtype: str

shortlabel[source]

Return short description of stream.