Configuration Reference¶
Required Settings¶
id
¶
type: | str |
---|
A string uniquely identifying the app, shared across all instances such that two app instances with the same id are considered to be in the same “group”.
This parameter is required.
The id and Kafka
When using Kafka, the id is used to generate app-local topics, and names for consumer groups.
Commonly Used Settings¶
broker
¶
type: | str |
---|---|
default: | "aiokafka://localhost:9092" |
Faust needs the URL of a “transport” to send and receive messages.
Currently, the only supported production transport is kafka://
.
This uses the aiokafka client under the hood, for consuming and
producing messages.
You can specify multiple hosts at the same time by separating them using the semi-comma:
aiokafka://kafka1.example.com:9092;kafka2.example.com:9092
store
¶
type: | str |
---|---|
default: | memory:// |
The backend used for table storage.
Tables are stored in-memory by default, but you should
not use the memory://
store in production.
In production, a persistent table store, such as rocksdb://
is
preferred.
autodiscover
¶
type: | Union[bool, Iterable[str], Callable[[], Iterable[str]]] |
---|
Enable autodiscovery of agent, task, timer, page and command decorators.
Faust has an API to add different asyncio
services and other user
extensions, such as “Agents”, HTTP web views, command-line commands, and
timers to your Faust workers. These can be defined in any module, so to
discover them at startup, the worker needs to traverse packages looking
for them.
Warning
The autodiscovery functionality uses the Venusian library to
scan wanted packages for @app.agent
, @app.page
,
@app.command
, @app.task
and @app.timer
decorators,
but to do so, it’s required to traverse the package path and import
every module in it.
Importing random modules like this can be dangerous so make sure you follow Python programming best practices. Do not start threads; perform network I/O; do test monkey-patching for mocks or similar, as a side effect of importing a module. If you encounter a case such as this then please find a way to perform your action in a lazy manner.
Warning
If the above warning is something you cannot fix, or if it’s out of your
control, then please set autodiscover=False
and make sure the worker
imports all modules where your decorators are defined.
The value for this argument can be:
bool
If
App(autodiscover=True)
is set, the autodiscovery will scan the package name described in theorigin
attribute.The
origin
attribute is automatically set when you start a worker using the faust command line program, for example:.. sourcecode:: console
faust -A example.simple workerThe
-A
, option specifies the app, but you can also create a shortcut entrypoint entrypoint by callingapp.main()
:if __name__ == '__main__': app.main()
Then you can start the faust program by executing for example
python myscript.py worker --loglevel=INFO
, and it will use the correct application.Sequence[str]
The argument can also be a list of packages to scan:
app = App(..., autodiscover=['proj_orders', 'proj_accounts'])
Callable[[], Sequence[str]]
The argument can also be a function returning a list of packages to scan:
def get_all_packages_to_scan(): return ['proj_orders', 'proj_accounts'] app = App(..., autodiscover=get_all_packages_to_scan)
False)
If everything you need is in a self-contained module, or you import the stuff you need manually, just setautodiscover
to False and don’t worry about it :-)
Django
When using Django and the DJANGO_SETTINGS_MODULE
environment variable is set, the Faust app will scan all packages found
in the INSTALLED_APPS
setting.
If you’re using Django you can use this to scan for
agents/pages/commands in all packages defined in INSTALLED_APPS
.
Faust will automatically detect that you’re using Django and do the right thing if you do:
app = App(..., autodiscover=True)
It will find agents and other decorators in all of the reusable Django apps. If you want to manually control what packages are traversed, then provide a list:
app = App(..., autodiscover=['package1', 'package2'])
or if you want exactly None
packages to be traversed, then
provide a False:
app = App(.., autodiscover=False)
which is the default, so you can simply omit the argument.
Tip
For manual control over autodiscovery, you can also call the
app.discover()
method manually.
version
¶
type: | int |
---|---|
default: | 1 |
Version of the app, that when changed will create a new isolated instance of the application. The first version is 1, the second version is 2, and so on.
Source topics will not be affected by a version change.
Faust applications will use two kinds of topics: source topics, and
internally managed topics. The source topics are declared by the producer,
and we do not have the opportunity to modify any configuration settings,
like number of partitions for a source topic; we may only consume from
them. To mark a topic as internal, use: app.topic(..., internal=True)
.
datadir
¶
type: | Union[str, pathlib.Path] |
---|---|
default: | "{appid}-data" |
environment: | FAUST_DATADIR , F_DATADIR |
The directory in which this instance stores the data used by local tables, etc.
See also
- The data directory can also be set using the
faust --datadir
option, from the command-line, so there’s usually no reason to provide a default value when creating the app.
tabledir
¶
type: | Union[str, pathlib.Path] |
---|---|
default: | "tables" |
The directory in which this instance stores local table data.
Usually you will want to configure the datadir
setting, but if you
want to store tables separately you can configure this one.
If the path provided is relative (it has no leading slash), then the path will
be considered to be relative to the datadir
setting.
id_format
¶
type: | str |
---|---|
default: | "{id}-v{self.version}" |
The format string used to generate the final id
value by combining
it with the version
parameter.
loghandlers
¶
type: | List[logging.LogHandler] |
---|---|
default: | None |
Specify a list of custom log handlers to use in worker instances.
origin
¶
type: | str |
---|---|
default: | None |
The reverse path used to find the app, for example if the app is located in:
from myproj.app import app
Then the origin
should be "myproj.app"
.
The faust worker program will try to automatically set the origin, but if you are having problems with autogenerated names then you can set origin manually.
Serialization Settings¶
key_serializer
¶
type: | Union[str, Codec] |
---|---|
default: | "json" |
Serializer used for keys by default when no serializer is specified, or a model is not being used.
This can be the name of a serializer/codec, or an actual
faust.serializers.codecs.Codec
instance.
See also
- The Codecs section in the model guide – for more information about codecs.
value_serializer
¶
type: | Union[str, Codec] |
---|---|
default: | "json" |
Serializer used for values by default when no serializer is specified, or a model is not being used.
This can be string, the name of a serializer/codec, or an actual
faust.serializers.codecs.Codec
instance.
See also
- The Codecs section in the model guide – for more information about codecs.
Topic Settings¶
Advanced Broker Settings¶
broker_client_id
¶
type: | str |
---|---|
default: | faust-{VERSION} |
You shouldn’t have to set this manually.
The client id is used to identify the software used, and is not usually configured by the user.
broker_commit_every
¶
type: | int |
---|---|
default: | 1000 |
Commit offset every n messages.
See also broker_commit_interval
, which is how frequently
we commit on a timer when there are few messages being received.
broker_commit_interval
¶
type: | float , timedelta |
---|---|
default: | 2.8 |
How often we commit messages that have been fully processed (acked).
broker_commit_livelock_soft_timeout
¶
type: | class:float, timedelta |
---|---|
default: | 300.0 (five minutes) |
How long time it takes before we warn that the Kafka commit offset has not advanced (only when processing messages).
broker_heartbeat_interval
¶
type: | int |
---|---|
default: | 3.0 (three seconds) |
How often we send heartbeats to the broker, and also how often we expect to receive heartbeats from the broker.
If any of these time out, you should increase this setting.
broker_session_timeout
¶
type: | int |
---|---|
default: | 30.0 (thirty seconds) |
How long to wait for a node to finish rebalancing before the broker will consider it dysfunctional and remove it from the cluster.
Increase this if you experience the cluster being in a state of constantly
rebalancing, but make sure you also increase the
broker_heartbeat_interval
at the same time.
Advanced Producer Settings¶
producer_compression_type
¶
type: | string |
---|---|
default: | None |
The compression type for all data generated by the producer. Valid values are ‘gzip’, ‘snappy’, ‘lz4’, or None.
producer_linger_ms
¶
type: | int |
---|---|
default: | 0 |
Minimum time to batch before sending out messages from the producer.
Should rarely have to change this.
producer_max_request_size
¶
type: | int |
---|---|
default: | 1000000 |
Maximum size of a request in bytes in the producer.
Should rarely have to change this.
producer_acks
¶
type: | int |
---|---|
default: | -1 |
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:
0
: Producer will not wait for any acknowledgment from the server at all. The message will immediately be considered sent. (Not recommended)1
: The broker leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.-1
: The broker leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
Advanced Table Settings¶
Advanced Stream Settings¶
stream_buffer_maxsize
¶
type: | int |
---|---|
default: | 4096 |
This setting control backpressure to streams and agents reading from streams.
If set to 4096 (default) this means that an agent can only keep at most 4096 unprocessed items in the stream buffer.
Essentially this will limit the number of messages a stream can “prefetch”.
Higher numbers gives better throughput, but do note that if your agent sends messages or update tables (which sends changelog messages), Faust 1.0 will move the sending of those messages to when the offset of the source message (the one that initiated the sending/change) is committed.
This means that if the buffer size is large, the
broker_commit_interval
or broker_commit_every
settings
must be set to commit frequently, avoiding backpressure from building up.
A buffer size of 131_072 may let you process over 30,000 events a second as a baseline, but be careful with a buffer size that large when you also send messages or update tables.
The next version of Faust will take advantage of Kafka transactions to remove the bottleneck of sending messages on commit.
stream_wait_empty
¶
type: | bool |
---|---|
default: | True |
This setting controls whether the worker should wait for the currently processing task in an agent to complete before rebalancing or shutting down.
On rebalance/shut down we clear the stream buffers. Those events will be reprocessed after the rebalance anyway, but we may have already started processing one event in every agent, and if we rebalance we will process that event again.
By default we will wait for the currently active tasks, but if your streams are idempotent you can disable it using this setting.
stream_ack_exceptions
¶
type: | bool |
---|---|
default: | True |
What happens when an exception is raised while processing an event? We ack that message by default, so we never reprocess it. This may be surprising, but it avoids the very likely scenario of causing a high frequency loop, where the error constantly happens and we never recover.
You can set this to False
to reprocess events that caused
an exception to be raised.
stream_ack_cancelled_tasks
¶
type: | bool |
---|---|
default: | False |
What happens when processing an event and the task processing it is cancelled? Agent tasks can be cancelled during shutdown or rebalance, and by default we do not ack the task in this case, so the event can be reprocessed.
If your agent processors are not idempotent you may want to set this flag to
True
, so that once processing an event started, it will not
process that event again.
stream_publish_on_commit
¶
type: | bool |
---|---|
default: | False |
We buffer up sending messages until the source topic offset related to that processsing is committed. This means when we do commit, we may have buffered up a LOT of messages so commit frequently.
This setting will be removed once transaction support is added in a later version.
Advanced Worker Settings¶
worker_redirect_stdouts
¶
type: | bool |
---|---|
default: | True |
Enable to have the worker redirect output to sys.stdout
and
sys.stderr
to the Python logging system.
Enabled by default.
Advanced Web Server Settings¶
canonical_url
¶
type: | str |
---|---|
default: | socket.gethostname() |
You shouldn’t have to set this manually.
The canonical URL defines how to reach the web server on a running
worker node, and is usually set by combining the faust worker --web-host
and faust worker --web-port
command line arguments, not
by passing it as a keyword argument to App
.
Advanced Agent Settings¶
agent_supervisor
¶
type: | str: /mode.SupervisorStrategyT |
---|---|
default: | mode.OneForOneSupervisor |
An agent may start multiple instances (actors) when
the concurrency setting is higher than one (e.g.
@app.agent(concurrency=2)
).
Multiple instances of the same agent are considered to be in the same supervisor group.
The default supervisor is the mode.OneForOneSupervisor
:
if an instance in the group crashes, we restart that instance only.
These are the supervisors supported:
-
If an instance in the group crashes we restart only that instance.
-
If an instance in the group crashes we restart the whole group.
-
If an instance in the group crashes we stop the whole application, and exit so that the Operating System supervisor can restart us.
mode.ForfeitOneForOneSupervisor
If an instance in the group crashes we give up on that instance and never restart it again (until the program is restarted).
mode.ForfeitOneForAllSupervisor
If an instance in the group crashes we stop all instances in the group and never restarted them again (until the program is restarted).
Agent RPC Settings¶
reply_to
¶
type: | str |
---|---|
default: | <generated> |
The name of the reply topic used by this instance. If not set one will be automatically generated when the app is created.
reply_create_topic
¶
type: | bool |
---|---|
default: | False |
Set this to True
if you plan on using the RPC with agents.
This will create the internal topic used for RPC replies on that instance at startup.
reply_expires
¶
type: | Union[float, datetime.timedelta] |
---|---|
default: | timedelta(days=1) |
The expiry time (in seconds float, or timedelta), for how long replies will stay in the instances local reply topic before being removed.
Extension Settings¶
Agent
¶
type: | Union[str, Type] |
---|---|
default: | "faust.Agent" |
The Agent
class to use for agents, or the fully-qualified
path to one (supported by symbol_by_name()
).
Example using a class:
class MyAgent(faust.Agent):
...
app = App(..., Agent=MyAgent)
Example using the string path to a class:
app = App(..., Agent='myproj.agents.Agent')
Stream
¶
type: | Union[str, Type] |
---|---|
default: | "faust.Stream" |
The Stream
class to use for streams, or the fully-qualified
path to one (supported by symbol_by_name()
).
Example using a class:
class MyBaseStream(faust.Stream):
...
app = App(..., Stream=MyBaseStream)
Example using the string path to a class:
app = App(..., Stream='myproj.streams.Stream')
Table
¶
type: | Union[str, Type[TableT]] |
---|---|
default: | "faust.Table" |
The Table
class to use for tables, or the fully-qualified
path to one (supported by symbol_by_name()
).
Example using a class:
class MyBaseTable(faust.Table):
...
app = App(..., Table=MyBaseTable)
Example using the string path to a class:
app = App(..., Table='myproj.tables.Table')
TableManager
¶
type: | Union[str, Type[TableManagerT]] |
---|---|
default: | "faust.tables.TableManager" |
The TableManager
used for managing tables,
or the fully-qualified path to one (supported by
symbol_by_name()
).
Example using a class:
from faust.tables import TableManager
class MyTableManager(TableManager):
...
app = App(..., TableManager=MyTableManager)
Example using the string path to a class:
app = App(..., TableManager='myproj.tables.TableManager')
Serializers
¶
type: | Union[str, Type[RegistryT]] |
---|---|
default: | "faust.serializers.Registry" |
The Registry
class used for
serializing/deserializing messages; or the fully-qualified path
to one (supported by symbol_by_name()
).
Example using a class:
from faust.serialiers import Registry
class MyRegistry(Registry):
...
app = App(..., Serializers=MyRegistry)
Example using the string path to a class:
app = App(..., Serializers='myproj.serializers.Registry')
Worker
¶
type: | Union[str, Type[WorkerT]] |
---|---|
default: | "faust.Worker" |
The Worker
class used for starting a worker
for this app; or the fully-qualified path
to one (supported by symbol_by_name()
).
Example using a class:
import faust
class MyWorker(faust.Worker):
...
app = faust.App(..., Worker=Worker)
Example using the string path to a class:
app = faust.App(..., Worker='myproj.workers.Worker')
PartitionAssignor
¶
type: | Union[str, Type[PartitionAssignorT]] |
---|---|
default: | "faust.assignor.PartitionAssignor" |
The PartitionAssignor
class used for assigning
topic partitions to worker instances; or the fully-qualified path
to one (supported by symbol_by_name()
).
Example using a class:
from faust.assignor import PartitionAssignor
class MyPartitionAssignor(PartitionAssignor):
...
app = App(..., PartitionAssignor=PartitionAssignor)
Example using the string path to a class:
app = App(..., Worker='myproj.assignor.PartitionAssignor')
LeaderAssignor
¶
type: | Union[str, Type[LeaderAssignorT]] |
---|---|
default: | "faust.assignor.LeaderAssignor" |
The LeaderAssignor
class used for assigning
a master Faust instance for the app; or the fully-qualified path
to one (supported by symbol_by_name()
).
Example using a class:
from faust.assignor import LeaderAssignor
class MyLeaderAssignor(LeaderAssignor):
...
app = App(..., LeaderAssignor=LeaderAssignor)
Example using the string path to a class:
app = App(..., Worker='myproj.assignor.LeaderAssignor')
Router
¶
type: | Union[str, Type[RouterT]] |
---|---|
default: | "faust.app.router.Router" |
The Router
class used for routing requests
to a worker instance having the partition for a specific key (e.g. table key);
or the fully-qualified path to one (supported by
symbol_by_name()
).
Example using a class:
from faust.router import Router
class MyRouter(Router):
...
app = App(..., Router=Router)
Example using the string path to a class:
app = App(..., Router='myproj.routers.Router')
Topic
¶
type: | Union[str, Type[TopicT]] |
---|---|
default: | "faust.Topic" |
The Topic
class used for defining new topics; or the
fully-qualified path to one (supported by
symbol_by_name()
).
Example using a class:
import faust
class MyTopic(faust.Topic):
...
app = faust.App(..., Topic=MyTopic)
Example using the string path to a class:
app = faust.App(..., Topic='myproj.topics.Topic')
HttpClient
¶
type: | Union[str, Type[HttpClientT]] |
---|---|
default: | "aiohttp.client:ClientSession" |
The aiohttp.client.ClientSession
class used as a HTTP client; or the
fully-qualified path to one (supported by
symbol_by_name()
).
Example using a class:
import faust
from aiohttp.client import ClientSession
class HttpClient(ClientSession):
...
app = faust.App(..., HttpClient=HttpClient)
Example using the string path to a class:
app = faust.App(..., HttpClient='myproj.http.HttpClient')
Monitor
¶
type: | Union[str, Type[SensorT]] |
---|---|
default: | "faust.sensors:Monitor" |
The Monitor
class as the main sensor
gathering statistics for the application; or the
fully-qualified path to one (supported by
symbol_by_name()
).
Example using a class:
import faust
from faust.sensors import Monitor
class MyMonitor(Monitor):
...
app = faust.App(..., Monitor=MyMonitor)
Example using the string path to a class:
app = faust.App(..., Monitor='myproj.monitors.Monitor')