faust.events

class faust.events.App[source]
class faust.events.Event(app: faust.types.app.AppT, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], value: Union[bytes, faust.types.core.ModelT, typing.Any], message: faust.types.tuples.Message) → None[source]

An event received on a channel.

Notes

  • Events have a key and a value:

    event.key, event.value
    
  • They also have a reference to the original message (if available), such as a Kafka record:

    event.message.offset

  • Iteratiing over channels/topics yields Event:

    async for event in channel:

  • Iterating over a stream (that in turn iterate over channel) yields Event.value:

    async for value in channel.stream()  # value is event.value
        ...
    
  • If you only have a Stream object, you can also access underlying events by using Stream.events.

    For example:

    async for event in channel.stream.events():
        ...
    

    Also commonly used for finding the “current event” related to a value in the stream:

    stream = channel.stream()
    async for event in stream.events():
        event = stream.current_event
        message = event.message
        topic = event.message.topic
    

    You can retrieve the current event in a stream to:

    • Get access to the serialized key+value.
    • Get access to message properties like, what topic+partition the value was received on, or its offset.

    If you want access to both key and value, you should use stream.items() instead.

    async for key, value in stream.items():
        ...
    

    stream.current_event can also be accessed but you must take extreme care you are using the correct stream object. Methods such as .group_by(key) and .through(topic) returns cloned stream objects, so in the example:

    The best way to access the current_event in an agent is to use the contextvar:

    from faust import current_event
    
    @app.agent(topic)
    async def process(stream):
        async for value in stream:
            event = current_event()
    
coroutine forward(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = <object object>, value: Union[bytes, faust.types.core.ModelT, typing.Any] = <object object>, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Forward original message (will not be reserialized).

Return type:Awaitable[RecordMetadata]
coroutine send(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = <object object>, value: Union[bytes, faust.types.core.ModelT, typing.Any] = <object object>, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send object to channel.

Return type:Awaitable[RecordMetadata]
ack() → bool[source]
Return type:bool