Models, Serialization, and Codecs¶
Basics¶
Models describe the fields of data structures used as keys and values
in messages. They’re defined using a NamedTuple
-like syntax,
as introduced by Python 3.6, and look like this:
class Point(Record, serializer='json'):
x: int
y: int
Here we define a “Point” record having x
, and y
fields of type int.
There’s no type checking at runtime, but the mypy type checker can be used
as a separate build step to verify that arguments have the correct type.
A record is a model of the dictionary type, and describes keys and values. When using JSON as the serialization format, the Point model above serializes as:
>>> Point(x=10, y=100).dumps()
{"x": 10, "y": 100}
A different serializer can be provided as an argument to .dumps
:
>>> Point(x=10, y=100).dumps('pickle') # pickle + Base64
b'gAN9cQAoWAEAAAB4cQFLClgBAAAAeXECS2RYBwAAAF9fZmF1c3RxA31xBFg
CAAAAbnNxBVgOAAAAX19tYWluX18uUG9pbnRxBnN1Lg=='
“Record” is the only model type supported by this version of Faust, but is just one of many possible model types to include in the future. The Avro serialization schema format, which the terminology is taken from, supports records, arrays, and more.
Manual Serialization¶
You’re not required to define models to read the data from a stream.
Manual de-serialization also works and is rather easy to perform.
Models provide additional benefit, such as the field descriptors that
let you refer to fields in group_by statements, static typing using
mypi, automatic conversion of datetime
, and so on…
To deserialize streams manually, merely use a topic with bytes values:
topic = app.topic('custom', value_type=bytes)
@app.agent
async def processor(stream):
async for payload in stream:
data = json.loads(payload)
To integrate with external systems, Faust’s Codecs can help you support serialization and de-serialization to and from any format. Models describe the form of messages, while codecs explain how they’re serialized/compressed/encoded/etc.
The default codec is configured by the applications key_serializer
and
value_serializer
arguments:
app = faust.App(key_serializer='json')
Individual models can override the default
by specifying a serializer
argument when creating the model class:
class MyRecord(Record, serializer='json')
...
Codecs can also be combined, so they consist of multiple encoding and decoding
stages, for example, data serialized with JSON and then Base64 encoded would
be described as the keyword argument serializer='json|binary'
.
See also
- The Codecs section – for more information about codecs and how to define your own.
Sending/receiving raw values
Serializing/deserializing keys and values manually without models is easy.
The JSON codec happily accepts lists and dictionaries,
as arguments to the .send
methods:
# examples/nondescript.py
import faust
app = faust.App('values')
transfers_topic = app.topic('transfers')
large_transfers_topic = app.topic('large_transfers')
@app.agent(transfers_topic)
async def find_large_transfers(transfers):
async for transfer in transfers:
if transfer['amount'] > 1000.0:
await large_transfers_topic.send(value=transfer)
async def send_transfer(account_id, amount):
await transfers_topic.send(value={
'account_id': account_id,
'amount': amount,
})
Using models to describe topics provides benefits:
# examples/described.py
import faust
class Transfer(faust.Record):
account_id: str
amount: float
app = faust.app('values')
transfers_topic = app.topic('transfers', value_type=Transfer)
large_transfers_topic = app.topic('large_transfers', value_type=Transfer)
@app.agent(transfers_topic)
async def find_large_transfers(transfers):
async for transfer in transfers:
if transfer.amount > 1000.0:
await large_transfers_topic.send(value=transfer)
async def send_transfer(account_id, amount):
await transfers_topic.send(
value=Transfer(account_id=account_id, amount=amount),
)
The mypy static type analyzer can now alert you if your
code is passing the wrong type of value for the account_id
field,
and more. The most compelling reason for using non-described messages would
be to integrate with existing Kafka topics and systems, but if you’re
writing new systems in Faust, the best practice would be to describe models
for your message data.
Model Types¶
The first version of Faust only supports dictionary models (records), but can be easily extended to support other types of models, like arrays.
Records¶
A record is a model based on a dictionary/mapping. The storage
used is a dictionary, and it serializes to a dictionary, but the same
is true for ordinary Python objects and their __dict__
storage, so you can
consider record models to be “objects” that can have methods and properties.
Here’s a simple record describing a 2d point, with two required fields: x
and y
:
class Point(faust.Record):
x: int
y: int
To create a new point, instantiate it like a regular Python object, and provide fields as keyword arguments:
>>> point = Point(x=10, y=20)
>>> point
<Point: x=10, y=20>
Faust throws an error if you instantiate a model without providing values for all required fields:
>>> point = Point(x=10)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/faust/faust/models/record.py", line 96, in __init__
self._init_fields(fields)
File "/opt/devel/faust/faust/models/record.py", line 106, in _init_fields
type(self).__name__, ', '.join(sorted(missing))))
TypeError: Point missing required arguments: y
Note
Python does not check types at runtime. The annotations are only used by static analysis tools like mypy.
To describe an optional field, provide a default value:
class Point(faust.Record, serializer='json'):
x: int
y: int = 0
You can now omit the y
field when creating a new point:
>>> point = Point(x=10)
>>> point
<Point: x=10, y=0>
When sending messages to topics, we can use Point
objects as message keys,
and values:
await app.send('mytopic', key=Point(x=10, y=20), value=Point(x=30, y=10))
The above will send a message to Kafka, and whatever receives that message must be able to deserialize the data.
To define an agent that is able to do so, define the topic to have specific key/value types:
my_topic = faust.topic('mytopic', key_type=Point, value_type=Point)
@app.agent(my_topic)
async def task(events):
async for event in events:
print(event)
Warning
You need to restart all Faust instances using the old key/value types, or alternatively provide an upgrade path for old instances.
Records can also have other records as fields:
class Account(faust.Record, serializer='json'):
id: str
balance: float
class Transfer(faust.Record, serializer='json'):
account: Account
amount: float
transfer = Transfer(
account=Account(id='RBH1235678', balance=13000.0),
amount=1000.0,
)
To manually serialize a record use its .dumps()
method:
>>> json = transfer.dumps()
To convert the JSON back into a model use the .loads()
class method:
>>> transfer = Transfer.loads(json_bytes_data)
Lists of lists, etc.¶
Records can also have fields that are a list of other models, or mappings to other models, and these are also described using the type annotation syntax.
To define a model that points to a list of Account objects you can do this:
from typing import List
import faust
class LOL(faust.Record):
accounts: List[Account]
This works with many of the iterable types, so for a list all
of Sequence
, MutableSequence
, and List
can be used. For a full list of generic data types
recognized by Faust, consult the following table:
Collection | Recognized Annotations |
---|---|
List | |
Set | |
Tuple |
|
Mapping |
From this table we can see that we can also have a mapping of username to account:
from typing import Mapping
import faust
class DOA(faust.Record):
accounts: Mapping[str, Account]
Faust will automatically reconstruct the DOA.accounts
field into
a mapping of string to Account
objects.
There are limitations to this, and Faust may not recognize your custom mapping or list type, so stick to what is listed in the table for your Faust version.
Automatic conversion of datetimes¶
Faust automatically serializes datetime
fields to
ISO-8601 text format but will not automatically deserialize ISO-8601 strings
back into datetime
(it is impossible to distinguish them
from ordinary strings).
However, if you use a model with a datetime
field, and enable the
isodates
model class setting, the model will correctly convert the strings
to datetime objects (with timezone information if available) when
deserialized:
from datetime import datetime
import faust
class Account(faust.Record, isodates=True, serializer='json'):
date_joined: datetime
Subclassing models: Abstract model classes¶
You can mark a model class as abstract=True
to create a model base class,
that you must inherit from to create new models having common functionality.
For example, you may want to have a base class for all models that have fields for time of creation, and time last created.
class MyBaseRecord(Record, abstract=True):
time_created: float = None
time_updated: float = None
An “abstract” class is only used to create new models:
class Account(MyBaseRecord):
id: str
account = Account(id='X', time_created=3124312.3442)
print(account.time_created)
Positional Arguments¶
You can also create model values using positional arguments,
meaning that Point(x=10, y=30)
can also be expressed as Point(10, 30)
.
The ordering of fields in positional arguments gets tricky when you add subclasses to the mix. In that case, the ordering is decided by the method resolution order, as demonstrated by this example:
import faust
class Point(faust.Record):
x: int
y: int
class XYZPoint(Point):
z: int
point = XYZPoint(10, 20, 30)
assert (point.x, point.y, point.z) == (10, 20, 30)
Reference¶
Serialization/Deserialization¶
-
class
faust.
Record
[source] -
classmethod
loads
(s: bytes, *, default_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None) → faust.types.models.ModelT Deserialize model object from bytes.
Parameters: - default_serializer (CodecArg) – Default serializer to use if no custom serializer was set for this model subclass.
- **kwargs – Additional attributes to set on the model object. Note, these are regarded as defaults, and any fields also present in the message takes precedence.
Return type:
-
dumps
(*, serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None) → bytes Serialize object to the target serialization format.
Return type: bytes
-
to_representation
() → Mapping[str, Any][source] Convert object to JSON serializable object.
Return type: Mapping
[str
,Any
]
-
classmethod
from_data
(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source] Return type: Record
-
derive
(*objects, **fields) → faust.types.models.ModelT Return type: ModelT
-
_options
Model metadata for introspection. An instance of
faust.types.models.ModelOptions
.
-
classmethod
-
class
faust.
ModelOptions
[source] -
fields
= None Index – Flattened view of __annotations__ in MRO order.
-
fieldset
= None Index – Set of required field names, for fast argument checking.
-
fieldpos
= None Index – Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.
-
optionalset
= None Index – Set of optional field names, for fast argument checking.
-
models
= None Index – Mapping of fields that are ModelT
-
converse
= None Index – Mapping of fields that are not builtin-types. E.g. datetime.
-
defaults
= None Mapping of field names to default value.
-
Codecs¶
Supported codecs¶
- raw - no encoding/serialization (bytes only).
- json -
json
with UTF-8 encoding. - pickle -
pickle
with Base64 encoding (not URL-safe). - binary - Base64 encoding (not URL-safe).
Encodings are not URL-safe if the encoded payload cannot be embedded directly into a URL query parameter.
Serialization by name¶
The dumps()
function takes a codec name and the object to encode as arguments,
and returns bytes
>>> s = dumps('json', obj)
In reverse direction, the loads()
function takes a codec name and
an encoded payload to decode (in bytes), as arguments, and returns a
reconstruction of the serialized object:
>>> obj = loads('json', s)
When passing in the codec type as a string (as in loads('json', ...)
above), you can also
combine multiple codecs to form a pipeline, for example "json|gzip"
combines JSON
serialization with gzip compression:
>>> obj = loads('json|gzip', s)
Codec registry¶
All codecs have a name and the faust.serializers.codecs
attribute
maintains a mapping from name to Codec
instance.
You can add a new codec to this mapping by executing:
>>> from faust.serializers import codecs
>>> codecs.register(custom, custom_serializer())
To create a new codec, you need to define only two methods: first
you need the _loads()
method to deserialize bytes, then you need
the _dumps()
method to serialize an object:
import msgpack
from faust.serializers import codecs
class raw_msgpack(codecs.Codec):
def _dumps(self, obj: Any) -> bytes:
return msgpack.dumps(obj)
def _loads(self, s: bytes) -> Any:
return msgpack.loads(s)
We use msgpack.dumps
to serialize, and our codec now encodes
to raw msgpack format in binary. We may have to write
this payload to somewhere unable to handle binary data well,
to solve that we combine the codec with Base64 encoding to convert
the binary to text.
Combining codecs is easy using the |
operator:
def msgpack() -> codecs.Codec:
return raw_msgpack() | codecs.binary()
codecs.register('msgpack', msgpack())
At this point, we monkey-patched Faust to support our codec, and we can use it to define records:
>>> from faust import Record
>>> class Point(Record, serializer='msgpack'):
... x: int
... y: int
The problem with monkey-patching is that we must make sure the patching happens before we use the feature.
Faust also supports registering codec extensions using setuptools entry-points, so instead, we can create an installable msgpack extension.
To do so, we need to define a package with the following directory layout:
faust-msgpack/
setup.py
faust_msgpack.py
The first file (faust-msgpack/setup.py
) defines metadata about our
package and should look like the following example:
import setuptools
setuptools.setup(
name='faust-msgpack',
version='1.0.0',
description='Faust msgpack serialization support',
author='Ola A. Normann',
author_email='ola@normann.no',
url='http://github.com/example/faust-msgpack',
platforms=['any'],
license='BSD',
packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
zip_safe=False,
install_requires=['msgpack-python'],
tests_require=[],
entry_points={
'faust.codecs': [
'msgpack = faust_msgpack:msgpack',
],
},
)
The most important part being the entry_points
key which tells
Faust how to load our plugin. We have set the name of our
codec to msgpack
and the path to the codec class
to be faust_msgpack:msgpack
. Faust imports this as it would
from faust_msgpack import msgpack
, so we need to define
that part next in our faust-msgpack/faust_msgpack.py
module:
from faust.serializers import codecs
class raw_msgpack(codecs.Codec):
def _dumps(self, obj: Any) -> bytes:
return msgpack.dumps(s)
def msgpack() -> codecs.Codec:
return raw_msgpack() | codecs.binary()
That’s it! To install and use our new extension do:
$ python setup.py install
At this point you can publish this to PyPI so it can be shared amongst other Faust users.