Source code for faust.types.app

import abc
import asyncio
import typing
from datetime import tzinfo
from typing import (
    Any,
    AsyncIterable,
    Awaitable,
    Callable,
    ClassVar,
    Iterable,
    Mapping,
    MutableSequence,
    Optional,
    Pattern,
    Set,
    Type,
    Union,
    no_type_check,
)

from mode import Seconds, ServiceT, Signal, SupervisorStrategyT, SyncSignal
from mode.utils.futures import stampede
from mode.utils.objects import cached_property
from mode.utils.queues import FlowControlEvent, ThrowableQueue
from mode.utils.types.trees import NodeT
from mode.utils.typing import NoReturn


from .agents import AgentFun, AgentManagerT, AgentT, SinkT
from .assignor import PartitionAssignorT
from .codecs import CodecArg
from .core import HeadersArg, K, V
from .fixups import FixupT
from .router import RouterT
from .sensors import SensorDelegateT
from .serializers import RegistryT
from .streams import StreamT
from .tables import CollectionT, TableManagerT, TableT
from .topics import ChannelT, TopicT
from .transports import ConductorT, ConsumerT, ProducerT, TransportT
from .tuples import MessageSentCallback, RecordMetadata, TP
from .web import (
    CacheBackendT,
    HttpClientT,
    PageArg,
    ResourceOptions,
    View,
    ViewDecorator,
    Web,
)
from .windows import WindowT

if typing.TYPE_CHECKING:
    from faust.cli.base import AppCommand as _AppCommand
    from faust.livecheck.app import LiveCheck as _LiveCheck
    from faust.sensors.monitor import Monitor as _Monitor
    from faust.worker import Worker as _Worker
    from .models import ModelArg as _ModelArg
    from .serializers import SchemaT as _SchemaT
    from .settings import Settings as _Settings
else:
    class _AppCommand: ...     # noqa
    class _SchemaT: ...        # noqa
    class _LiveCheck: ...      # noqa
    class _Monitor: ...        # noqa
    class _Worker: ...         # noqa
    class _ModelArg: ...       # noqa
    class _Settings: ...       # noqa

__all__ = [
    'TaskArg',
    'AppT',
]

TaskArg = Union[Callable[['AppT'], Awaitable], Callable[[], Awaitable]]


class BootStrategyT:
    app: 'AppT'

    enable_kafka: bool = True
    # We want these to take default from `enable_kafka`
    # attribute, but still want to allow subclasses to define
    # them like this:
    #   class MyBoot(BootStrategy):
    #       enable_kafka_consumer = False
    enable_kafka_consumer: Optional[bool] = None
    enable_kafka_producer: Optional[bool] = None

    enable_web: Optional[bool] = None
    enable_sensors: bool = True

    @abc.abstractmethod
    def __init__(self, app: 'AppT', *,
                 enable_web: bool = None,
                 enable_kafka: bool = True,
                 enable_kafka_producer: bool = None,
                 enable_kafka_consumer: bool = None,
                 enable_sensors: bool = True) -> None:
        ...

    @abc.abstractmethod
    def server(self) -> Iterable[ServiceT]:
        ...

    @abc.abstractmethod
    def client_only(self) -> Iterable[ServiceT]:
        ...

    @abc.abstractmethod
    def producer_only(self) -> Iterable[ServiceT]:
        ...


[docs]class AppT(ServiceT): """Abstract type for the Faust application. See Also: :class:`faust.App`. """ Settings: ClassVar[Type[_Settings]] BootStrategy: ClassVar[Type[BootStrategyT]] boot_strategy: BootStrategyT #: Set to true when the app is finalized (can read configuration). finalized: bool = False #: Set to true when the app has read configuration. configured: bool = False #: Set to true if the worker is currently rebalancing. rebalancing: bool = False rebalancing_count: int = 0 #: Set to true if the assignment is empty # This flag is set by App._on_partitions_assigned unassigned: bool = False #: Set to true when app is executing within a worker instance. # This flag is set in faust/worker.py in_worker: bool = False on_configured: SyncSignal[_Settings] = SyncSignal() on_before_configured: SyncSignal = SyncSignal() on_after_configured: SyncSignal = SyncSignal() on_partitions_assigned: Signal[Set[TP]] = Signal() on_partitions_revoked: Signal[Set[TP]] = Signal() on_rebalance_complete: Signal = Signal() on_before_shutdown: Signal = Signal() on_worker_init: SyncSignal = SyncSignal() on_produce_message: SyncSignal = SyncSignal() client_only: bool agents: AgentManagerT sensors: SensorDelegateT fixups: MutableSequence[FixupT] @abc.abstractmethod def __init__(self, id: str, *, monitor: _Monitor, config_source: Any = None, **options: Any) -> None: self.on_startup_finished: Optional[Callable] = None
[docs] @abc.abstractmethod def config_from_object(self, obj: Any, *, silent: bool = False, force: bool = False) -> None: ...
[docs] @abc.abstractmethod def finalize(self) -> None: ...
[docs] @abc.abstractmethod def main(self) -> NoReturn: ...
[docs] @abc.abstractmethod def worker_init(self) -> None: ...
[docs] @abc.abstractmethod def discover(self, *extra_modules: str, categories: Iterable[str] = ('a', 'b', 'c'), ignore: Iterable[Any] = ('foo', 'bar')) -> None: ...
[docs] @abc.abstractmethod def topic(self, *topics: str, pattern: Union[str, Pattern] = None, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.AbstractEventLoop = None) -> TopicT: ...
[docs] @abc.abstractmethod def channel(self, *, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, maxsize: int = None, loop: asyncio.AbstractEventLoop = None) -> ChannelT: ...
[docs] @abc.abstractmethod def agent(self, channel: Union[str, ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[SupervisorStrategyT] = None, sink: Iterable[SinkT] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) -> Callable[[AgentFun], AgentT]: ...
[docs] @abc.abstractmethod @no_type_check def task(self, fun: TaskArg, *, on_leader: bool = False, traced: bool = True) -> Callable: ...
[docs] @abc.abstractmethod def timer(self, interval: Seconds, on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) -> Callable: ...
[docs] @abc.abstractmethod def crontab(self, cron_format: str, *, timezone: tzinfo = None, on_leader: bool = False, traced: bool = True) -> Callable: ...
[docs] @abc.abstractmethod def service(self, cls: Type[ServiceT]) -> Type[ServiceT]: ...
[docs] @abc.abstractmethod def stream(self, channel: AsyncIterable, beacon: NodeT = None, **kwargs: Any) -> StreamT: ...
[docs] @abc.abstractmethod def Table(self, name: str, *, default: Callable[[], Any] = None, window: WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) -> TableT: ...
[docs] @abc.abstractmethod def GlobalTable(self, name: str, *, default: Callable[[], Any] = None, window: WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) -> TableT: ...
[docs] @abc.abstractmethod def SetTable(self, name: str, *, window: WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) -> TableT: ...
[docs] @abc.abstractmethod def SetGlobalTable(self, name: str, *, window: WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) -> TableT: ...
[docs] @abc.abstractmethod def page(self, path: str, *, base: Type[View] = View, cors_options: Mapping[str, ResourceOptions] = None, name: str = None) -> Callable[[PageArg], Type[View]]: ...
[docs] @abc.abstractmethod def table_route(self, table: CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None, exact_key: str = None) -> ViewDecorator: ...
[docs] @abc.abstractmethod def command(self, *options: Any, base: Type[_AppCommand] = None, **kwargs: Any) -> Callable[[Callable], Type[_AppCommand]]: ...
[docs] @abc.abstractmethod async def start_client(self) -> None: ...
[docs] @abc.abstractmethod async def maybe_start_client(self) -> None: ...
[docs] @abc.abstractmethod async def send( self, channel: Union[ChannelT, str], key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, schema: _SchemaT = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None) -> Awaitable[RecordMetadata]: ...
[docs] @abc.abstractmethod def LiveCheck(self, **kwargs: Any) -> _LiveCheck: ...
[docs] @stampede @abc.abstractmethod async def maybe_start_producer(self) -> ProducerT: ...
[docs] @abc.abstractmethod def is_leader(self) -> bool: ...
[docs] @abc.abstractmethod def FlowControlQueue( self, maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.AbstractEventLoop = None) -> ThrowableQueue: ...
[docs] @abc.abstractmethod def Worker(self, **kwargs: Any) -> _Worker: ...
[docs] @abc.abstractmethod def on_webserver_init(self, web: Web) -> None: ...
[docs] @abc.abstractmethod def on_rebalance_start(self) -> None: ...
[docs] @abc.abstractmethod def on_rebalance_end(self) -> None: ...
@property def conf(self) -> _Settings: ... @conf.setter def conf(self, settings: _Settings) -> None: ... @property @abc.abstractmethod def transport(self) -> TransportT: ... @transport.setter def transport(self, transport: TransportT) -> None: ... @property @abc.abstractmethod def producer_transport(self) -> TransportT: ... @producer_transport.setter def producer_transport(self, transport: TransportT) -> None: ... @property @abc.abstractmethod def cache(self) -> CacheBackendT: ... @cache.setter def cache(self, cache: CacheBackendT) -> None: ... @property @abc.abstractmethod def producer(self) -> ProducerT: ... @property @abc.abstractmethod def consumer(self) -> ConsumerT: ...
[docs] @cached_property @abc.abstractmethod def tables(self) -> TableManagerT: ...
[docs] @cached_property @abc.abstractmethod def topics(self) -> ConductorT: ...
@property @abc.abstractmethod def monitor(self) -> _Monitor: ... @monitor.setter def monitor(self, value: _Monitor) -> None: ...
[docs] @cached_property @abc.abstractmethod def flow_control(self) -> FlowControlEvent: return FlowControlEvent(loop=self.loop)
@property @abc.abstractmethod def http_client(self) -> HttpClientT: ... @http_client.setter def http_client(self, client: HttpClientT) -> None: ... @property @abc.abstractmethod def assignor(self) -> PartitionAssignorT: ... @property @abc.abstractmethod def router(self) -> RouterT: ... @property @abc.abstractmethod def serializers(self) -> RegistryT: ... @property @abc.abstractmethod def web(self) -> Web: ... @web.setter def web(self, web: Web) -> None: ... @property @abc.abstractmethod def in_transaction(self) -> bool: ...