import abc
import asyncio
import typing
from datetime import tzinfo
from typing import (
Any,
AsyncIterable,
Awaitable,
Callable,
ClassVar,
Iterable,
Mapping,
MutableSequence,
Optional,
Pattern,
Set,
Type,
Union,
)
from mode import Seconds, ServiceT, Signal, SupervisorStrategyT, SyncSignal
from mode.utils.futures import stampede
from mode.utils.objects import cached_property
from mode.utils.queues import FlowControlEvent, ThrowableQueue
from mode.utils.types.trees import NodeT
from .agents import AgentFun, AgentManagerT, AgentT, SinkT
from .assignor import PartitionAssignorT
from .codecs import CodecArg
from .core import K, V
from .fixups import FixupT
from .router import RouterT
from .sensors import SensorDelegateT
from .serializers import RegistryT
from .streams import StreamT
from .tables import CollectionT, TableManagerT, TableT
from .topics import ChannelT, TopicT
from .transports import ConductorT, ConsumerT, ProducerT, TransportT
from .tuples import MessageSentCallback, RecordMetadata, TP
from .web import (
CacheBackendT,
HttpClientT,
PageArg,
View,
ViewDecorator,
Web,
)
from .windows import WindowT
if typing.TYPE_CHECKING:
from faust.cli.base import AppCommand
from faust.sensors.monitor import Monitor
from faust.worker import Worker as WorkerT
from .models import ModelArg
from .settings import Settings
else:
class AppCommand: ... # noqa
class Monitor: ... # noqa
class ModelArg: ... # noqa
class WorkerT: ... # noqa
class Settings: ... # noqa
__all__ = [
'TaskArg',
'AppT',
]
TaskArg = Union[Callable[['AppT'], Awaitable], Callable[[], Awaitable]]
class BootStrategyT:
app: 'AppT'
enable_kafka: bool = True
# We want these to take default from `enable_kafka`
# attribute, but still want to allow subclasses to define
# them like this:
# class MyBoot(BootStrategy):
# enable_kafka_consumer = False
enable_kafka_consumer: Optional[bool] = None
enable_kafka_producer: Optional[bool] = None
enable_web: Optional[bool] = None
enable_sensors: bool = True
@abc.abstractmethod
def __init__(self, app: 'AppT', *,
enable_web: bool = None,
enable_kafka: bool = True,
enable_kafka_producer: bool = None,
enable_kafka_consumer: bool = None,
enable_sensors: bool = True) -> None:
...
@abc.abstractmethod
def server(self) -> Iterable[ServiceT]:
...
@abc.abstractmethod
def client_only(self) -> Iterable[ServiceT]:
...
@abc.abstractmethod
def producer_only(self) -> Iterable[ServiceT]:
...
[docs]class AppT(ServiceT):
"""Abstract type for the Faust application.
See Also:
:class:`faust.App`.
"""
BootStrategy: ClassVar[Type[BootStrategyT]]
boot_strategy: BootStrategyT
#: Set to true when the app is finalized (can read configuration).
finalized: bool = False
#: Set to true when the app has read configuration.
configured: bool = False
#: Set to true if the worker is currently rebalancing.
rebalancing: bool = False
#: Set to true if the assignment is empty
# This flag is set by App._on_partitions_assigned
unassigned: bool = False
on_configured: SyncSignal[Settings] = SyncSignal()
on_before_configured: SyncSignal = SyncSignal()
on_after_configured: SyncSignal = SyncSignal()
on_partitions_assigned: Signal[Set[TP]] = Signal()
on_partitions_revoked: Signal[Set[TP]] = Signal()
on_rebalance_complete: Signal = Signal()
on_before_shutdown: Signal = Signal()
on_worker_init: SyncSignal = SyncSignal()
client_only: bool
agents: AgentManagerT
sensors: SensorDelegateT
fixups: MutableSequence[FixupT]
@abc.abstractmethod
def __init__(self,
id: str,
*,
monitor: Monitor,
config_source: Any = None,
**options: Any) -> None:
self.on_startup_finished: Optional[Callable] = None
[docs] @abc.abstractmethod
def config_from_object(self,
obj: Any,
*,
silent: bool = False,
force: bool = False) -> None:
...
[docs] @abc.abstractmethod
def finalize(self) -> None:
...
[docs] @abc.abstractmethod
def main(self) -> None:
...
[docs] @abc.abstractmethod
def worker_init(self) -> None:
...
[docs] @abc.abstractmethod
def discover(self,
*extra_modules: str,
categories: Iterable[str] = ('a', 'b', 'c'),
ignore: Iterable[str] = ('foo', 'bar')) -> None:
...
[docs] @abc.abstractmethod
def topic(self,
*topics: str,
pattern: Union[str, Pattern] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
partitions: int = None,
retention: Seconds = None,
compacting: bool = None,
deleting: bool = None,
replicas: int = None,
acks: bool = True,
internal: bool = False,
config: Mapping[str, Any] = None,
maxsize: int = None,
allow_empty: bool = False,
loop: asyncio.AbstractEventLoop = None) -> TopicT:
...
[docs] @abc.abstractmethod
def channel(self,
*,
key_type: ModelArg = None,
value_type: ModelArg = None,
maxsize: int = None,
loop: asyncio.AbstractEventLoop = None) -> ChannelT:
...
[docs] @abc.abstractmethod
def agent(self,
channel: Union[str, ChannelT] = None,
*,
name: str = None,
concurrency: int = 1,
supervisor_strategy: Type[SupervisorStrategyT] = None,
sink: Iterable[SinkT] = None,
isolated_partitions: bool = False,
**kwargs: Any) -> Callable[[AgentFun], AgentT]:
...
[docs] @abc.abstractmethod
def task(self, fun: TaskArg) -> Callable:
...
[docs] @abc.abstractmethod
def timer(self, interval: Seconds, on_leader: bool = False) -> Callable:
...
[docs] @abc.abstractmethod
def crontab(self, cron_format: str, *,
timezone: tzinfo = None,
on_leader: bool = False) -> Callable:
...
[docs] @abc.abstractmethod
def service(self, cls: Type[ServiceT]) -> Type[ServiceT]:
...
[docs] @abc.abstractmethod
def stream(self,
channel: AsyncIterable,
beacon: NodeT = None,
**kwargs: Any) -> StreamT:
...
[docs] @abc.abstractmethod
def Table(self,
name: str,
*,
default: Callable[[], Any] = None,
window: WindowT = None,
partitions: int = None,
help: str = None,
**kwargs: Any) -> TableT:
...
[docs] @abc.abstractmethod
def page(self, path: str, *,
base: Type[View] = View,
name: str = None) -> Callable[[PageArg], Type[View]]:
...
[docs] @abc.abstractmethod
def table_route(self, table: CollectionT,
shard_param: str = None,
*,
query_param: str = None,
match_info: str = None) -> ViewDecorator:
...
[docs] @abc.abstractmethod
def command(self,
*options: Any,
base: Type[AppCommand] = None,
**kwargs: Any) -> Callable[[Callable], Type[AppCommand]]:
...
[docs] @abc.abstractmethod
async def start_client(self) -> None:
...
[docs] @abc.abstractmethod
async def maybe_start_client(self) -> None:
...
[docs] @abc.abstractmethod
async def send(
self,
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: int = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None) -> Awaitable[RecordMetadata]:
...
[docs] @stampede
@abc.abstractmethod
async def maybe_start_producer(self) -> ProducerT:
...
[docs] @abc.abstractmethod
def is_leader(self) -> bool:
...
[docs] @abc.abstractmethod
def FlowControlQueue(
self,
maxsize: int = None,
*,
clear_on_resume: bool = False,
loop: asyncio.AbstractEventLoop = None) -> ThrowableQueue:
...
[docs] @abc.abstractmethod
def Worker(self, **kwargs: Any) -> WorkerT:
...
[docs] @abc.abstractmethod
def on_webserver_init(self, web: Web) -> None:
...
[docs] @abc.abstractmethod
def on_rebalance_start(self) -> None:
...
[docs] @abc.abstractmethod
def on_rebalance_end(self) -> None:
...
@property
def conf(self) -> Settings:
...
@conf.setter
def conf(self, settings: Settings) -> None:
...
@property
@abc.abstractmethod
def transport(self) -> TransportT:
...
@transport.setter
def transport(self, transport: TransportT) -> None:
...
@property
@abc.abstractmethod
def cache(self) -> CacheBackendT:
...
@cache.setter
def cache(self, cache: CacheBackendT) -> None:
...
@property
@abc.abstractmethod
def producer(self) -> ProducerT:
...
@property
@abc.abstractmethod
def consumer(self) -> ConsumerT:
...
[docs] @cached_property
@abc.abstractmethod
def tables(self) -> TableManagerT:
...
[docs] @cached_property
@abc.abstractmethod
def topics(self) -> ConductorT:
...
@property
@abc.abstractmethod
def monitor(self) -> Monitor:
...
@monitor.setter
def monitor(self, value: Monitor) -> None:
...
[docs] @cached_property
@abc.abstractmethod
def flow_control(self) -> FlowControlEvent:
return FlowControlEvent(loop=self.loop)
@property
@abc.abstractmethod
def http_client(self) -> HttpClientT:
...
@http_client.setter
def http_client(self, client: HttpClientT) -> None:
...
@property
@abc.abstractmethod
def assignor(self) -> PartitionAssignorT:
...
@property
@abc.abstractmethod
def router(self) -> RouterT:
...
@property
@abc.abstractmethod
def serializers(self) -> RegistryT:
...
@property
@abc.abstractmethod
def web(self) -> Web:
...
@web.setter
def web(self, web: Web) -> None:
...