Source code for faust.cli.send

"""Program ``faust send`` used to send events to agents and topics."""
import asyncio
import random
from typing import Any
from faust.types import CodecArg, K, RecordMetadata, V
from .base import AppCommand, argument, option

__all__ = ['send']


[docs]class send(AppCommand): """Send message to agent/topic.""" topic: Any key: K key_serializer: CodecArg value: V value_serializer: CodecArg repeat: int min_latency: float max_latency: float options = [ option('--key-type', '-K', help='Name of model to serialize key into.'), option('--key-serializer', help='Override default serializer for key.'), option('--value-type', '-V', help='Name of model to serialize value into.'), option('--value-serializer', help='Override default serializer for value.'), option('--key', '-k', help='String value for key (use json if model).'), option('--partition', type=int, help='Specific partition to send to.'), option('--repeat', '-r', type=int, default=1, help='Send message n times.'), option('--min-latency', type=float, default=0.0, help='Minimum delay between sending.'), option('--max-latency', type=float, default=0.0, help='Maximum delay between sending.'), argument('entity'), argument('value', default=None, required=False), ]
[docs] async def run(self, entity: str, value: str, *args: Any, key: str = None, key_type: str = None, key_serializer: str = None, value_type: str = None, value_serializer: str = None, partition: int = 1, timestamp: float = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, **kwargs: Any) -> Any: """Send message to topic/agent/channel.""" if key is not None: key = self.to_key(key_type, key) if value is not None: value = self.to_value(value_type, value) topic = self.to_topic(entity) for i in range(repeat): self.carp(f'k={key!r} v={value!r} -> {topic!r}...') fut_send_complete = await topic.send( key=key, value=value, partition=partition, timestamp=timestamp, key_serializer=key_serializer, value_serializer=value_serializer, ) meta: RecordMetadata = await fut_send_complete self.say(self.dumps(meta._asdict())) if i and max_latency: await asyncio.sleep(random.uniform(min_latency, max_latency)) await self.app.producer.stop()