faust.sensors.base

Base-interface for sensors.

class faust.sensors.base.Sensor(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Message sent to a stream as an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Message successfully sent.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Error while sending message.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]

Partition assignor did not complete assignor due to error.

Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

on_web_request_start(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]

Web server started working on request.

Return type

Dict[~KT, ~VT]

on_web_request_end(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]

Web server finished working on request.

Return type

None

asdict() → Mapping[source]

Convert sensor state to dictionary.

Return type

Mapping[~KT, +VT_co]

logger = <Logger faust.sensors.base (WARNING)>
class faust.sensors.base.SensorDelegate(app: faust.types.app.AppT) → None[source]

A class that delegates sensor methods to a list of sensors.

add(sensor: faust.types.sensors.SensorT) → None[source]

Add sensor.

Return type

None

remove(sensor: faust.types.sensors.SensorT) → None[source]

Remove sensor.

Return type

None

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call before message is delegated to streams.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Call when stream starts processing an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Call when stream is done processing an event.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Call when conductor topic buffer is full and has to wait.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call when message is fully acknowledged and can be committed.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when value in table is retrieved.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Call when new value for key in table is set.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when key in a table is deleted.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Call when consumer commit offset operation starts.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Call when consumer commit offset operation completed.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

Call when message added to producer buffer.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Call when producer finished sending message.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Call when producer was unable to publish message.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]

Partition assignor did not complete assignor due to error.

Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

on_web_request_start(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]

Web server started working on request.

Return type

Dict[~KT, ~VT]

on_web_request_end(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]

Web server finished working on request.

Return type

None