Models, Serialization, and Codecs

Basics

Models describe the fields of data structures used as keys and values in messages. They’re defined using a NamedTuple-like syntax, as introduced by Python 3.6, and look like this:

class Point(Record, serializer='json'):
    x: int
    y: int

Here we define a “Point” record having x, and y fields of type int. There’s no type checking at runtime, but the mypy type checker can be used as a separate build step to verify that arguments have the correct type.

A record is a model of the dictionary type, and describes keys and values. When using JSON as the serialization format, the Point model above serializes as:

>>> Point(x=10, y=100).dumps()
{"x": 10, "y": 100}

A different serializer can be provided as an argument to .dumps:

>>> Point(x=10, y=100).dumps('pickle')  # pickle + Base64
b'gAN9cQAoWAEAAAB4cQFLClgBAAAAeXECS2RYBwAAAF9fZmF1c3RxA31xBFg
CAAAAbnNxBVgOAAAAX19tYWluX18uUG9pbnRxBnN1Lg=='

“Record” is the only model type supported by this version of Faust, but is just one of many possible model types to include in the future. The Avro serialization schema format, which the terminology is taken from, supports records, arrays, and more.

Manual Serialization

You’re not required to define models to read the data from a stream. Manual de-serialization also works and is rather easy to perform. Models provide additional benefit, such as the field descriptors that let you refer to fields in group_by statements, static typing using mypi, automatic conversion of datetime, and so on…

To deserialize streams manually, merely use a topic with bytes values:

topic = app.topic('custom', value_type=bytes)

@app.agent
async def processor(stream):
    async for payload in stream:
        data = json.loads(payload)

To integrate with external systems, Faust’s Codecs can help you support serialization and de-serialization to and from any format. Models describe the form of messages, while codecs explain how they’re serialized/compressed/encoded/etc.

The default codec is configured by the applications key_serializer and value_serializer arguments:

app = faust.App(key_serializer='json')

Individual models can override the default by specifying a serializer argument when creating the model class:

class MyRecord(Record, serializer='json'):
    ...

Codecs can also be combined, so they consist of multiple encoding and decoding stages, for example, data serialized with JSON and then Base64 encoded would be described as the keyword argument serializer='json|binary'.

See also

  • The Codecs section – for more information about codecs and how to define your own.

Sending/receiving raw values

Serializing/deserializing keys and values manually without models is easy. The JSON codec happily accepts lists and dictionaries, as arguments to the .send methods:

# examples/nondescript.py
import faust

app = faust.App('values')
transfers_topic = app.topic('transfers')
large_transfers_topic = app.topic('large_transfers')

@app.agent(transfers_topic)
async def find_large_transfers(transfers):
    async for transfer in transfers:
        if transfer['amount'] > 1000.0:
            await large_transfers_topic.send(value=transfer)

async def send_transfer(account_id, amount):
    await transfers_topic.send(value={
        'account_id': account_id,
        'amount': amount,
    })

Using models to describe topics provides benefits:

# examples/described.py
import faust

class Transfer(faust.Record):
    account_id: str
    amount: float

app = faust.App('values')
transfers_topic = app.topic('transfers', value_type=Transfer)
large_transfers_topic = app.topic('large_transfers', value_type=Transfer)

@app.agent(transfers_topic)
async def find_large_transfers(transfers):
    async for transfer in transfers:
        if transfer.amount > 1000.0:
            await large_transfers_topic.send(value=transfer)

async def send_transfer(account_id, amount):
    await transfers_topic.send(
        value=Transfer(account_id=account_id, amount=amount),
    )

The mypy static type analyzer can now alert you if your code is passing the wrong type of value for the account_id field, and more. The most compelling reason for using non-described messages would be to integrate with existing Kafka topics and systems, but if you’re writing new systems in Faust, the best practice would be to describe models for your message data.

Model Types

The first version of Faust only supports dictionary models (records), but can be easily extended to support other types of models, like arrays.

Records

A record is a model based on a dictionary/mapping. The storage used is a dictionary, and it serializes to a dictionary, but the same is true for ordinary Python objects and their __dict__ storage, so you can consider record models to be “objects” that can have methods and properties.

Here’s a simple record describing a 2d point, with two required fields: x and y:

class Point(faust.Record):
    x: int
    y: int

To create a new point, instantiate it like a regular Python object, and provide fields as keyword arguments:

>>> point = Point(x=10, y=20)
>>> point
<Point: x=10, y=20>

Faust throws an error if you instantiate a model without providing values for all required fields:

>>> point = Point(x=10)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/faust/faust/models/record.py", line 96, in __init__
    self._init_fields(fields)
File "/opt/devel/faust/faust/models/record.py", line 106, in _init_fields
    type(self).__name__, ', '.join(sorted(missing))))
TypeError: Point missing required arguments: y

Note

Python does not check types at runtime. The annotations are only used by static analysis tools like mypy.

To describe an optional field, provide a default value:

class Point(faust.Record, serializer='json'):
    x: int
    y: int = 0

You can now omit the y field when creating a new point:

>>> point = Point(x=10)
>>> point
<Point: x=10, y=0>

When sending messages to topics, we can use Point objects as message keys, and values:

await app.send('mytopic', key=Point(x=10, y=20), value=Point(x=30, y=10))

The above will send a message to Kafka, and whatever receives that message must be able to deserialize the data.

To define an agent that is able to do so, define the topic to have specific key/value types:

my_topic = faust.topic('mytopic', key_type=Point, value_type=Point)

@app.agent(my_topic)
async def task(events):
    async for event in events:
        print(event)

Warning

You need to restart all Faust instances using the old key/value types, or alternatively provide an upgrade path for old instances.

Records can also have other records as fields:

class Account(faust.Record, serializer='json'):
    id: str
    balance: float

class Transfer(faust.Record, serializer='json'):
    account: Account
    amount: float

transfer = Transfer(
    account=Account(id='RBH1235678', balance=13000.0),
    amount=1000.0,
)

To manually serialize a record use its .dumps() method:

>>> json = transfer.dumps()

To convert the JSON back into a model use the .loads() class method:

>>> transfer = Transfer.loads(json_bytes_data)

Lists of lists, etc.

Records can also have fields that are a list of other models, or mappings to other models, and these are also described using the type annotation syntax.

To define a model that points to a list of Account objects you can do this:

from typing import List
import faust


class LOL(faust.Record):
    accounts: List[Account]

This works with many of the iterable types, so for a list all of Sequence, MutableSequence, and List can be used. For a full list of generic data types recognized by Faust, consult the following table:

Collection Recognized Annotations
List
Set
Tuple
  • Tuple[ModelT, ...]
  • Tuple[ModelT, ModelT, str]
Mapping

From this table we can see that we can also have a mapping of username to account:

from typing import Mapping
class DOA(faust.Record):
    accounts: Mapping[str, Account]

Faust will automatically reconstruct the DOA.accounts field into a mapping of string to Account objects.

There are limitations to this, and Faust may not recognize your custom mapping or list type, so stick to what is listed in the table for your Faust version.

Coercion

Automatic coercion of datetimes

Faust automatically serializes datetime fields to ISO-8601 text format but will not automatically deserialize ISO-8601 strings back into datetime (it is impossible to distinguish them from ordinary strings).

However, if you use a model with a datetime field, and enable the isodates model class setting, the model will correctly convert the strings to datetime objects (with timezone information if available) when deserialized:

from datetime import datetime
import faust

class Account(faust.Record, isodates=True, serializer='json'):
    date_joined: datetime
Automatic coercion of decimals

Similar to datetimes, json does not have a suitable high precision decimal field type.

You can enable the decimals=True option to coerce string decimal values back into Python decimal.Decimal objects.

from decimal import Decimal
import faust

class Order(faust.Record, decimals=True, serializer='json'):
    price: Decimal
    quantity: Decimal
Custom coercions

You can add custom coercion rules to your model classes using the coercions options. This must be a mapping from, either a tuple of types or a single type, to a function/class/callable used to convert it.

Here’s an example converting strings back to UUID objects:

from uuid import UUID
import faust

class Account(faust.Record, coercions={UUID: UUID}):
    id: UUID

You’d get tired writing this out for every class, so why not make an abstract model subclass:

from uuid import UUID
import faust

class UUIDAwareRecord(faust.Record,
                      abstract=True,
                      coercions={UUID: UUID}):
    ...

class Account(UUIDAwareRecord):
    id: UUID

Subclassing models: Abstract model classes

You can mark a model class as abstract=True to create a model base class, that you must inherit from to create new models having common functionality.

For example, you may want to have a base class for all models that have fields for time of creation, and time last created.

class MyBaseRecord(Record, abstract=True):
    time_created: float = None
    time_updated: float = None

An “abstract” class is only used to create new models:

class Account(MyBaseRecord):
    id: str

account = Account(id='X', time_created=3124312.3442)
print(account.time_created)

Positional Arguments

You can also create model values using positional arguments, meaning that Point(x=10, y=30) can also be expressed as Point(10, 30).

The ordering of fields in positional arguments gets tricky when you add subclasses to the mix. In that case, the ordering is decided by the method resolution order, as demonstrated by this example:

import faust

class Point(faust.Record):
    x: int
    y: int

class XYZPoint(Point):
    z: int

point = XYZPoint(10, 20, 30)
assert (point.x, point.y, point.z) == (10, 20, 30)

Blessed Keys and polymorphic fields

Models can contain fields that are other models, such as in this example where an account has a user:

class User(faust.Record):
    id: str
    first_name: str
    last_name: str

class Account(faust.Record, decimals=True):
    user: User
    balance: Decimal

This is a strict relationship, the value for Account.user can only ever be a User class.

Faust records also support polymorphic fields, where the type of the field is decided at runtime. Consider an Article model having a list of assets:

class Asset(faust.Record):
    url: str
    type: str


class ImageAsset(faust.Record):
    type = 'image'

class VideoAsset(faust.Record):
    runtime_seconds: float
    type = 'video'

class Article(faust.Record, allow_blessed_key=True):
    assets: List[Asset]

How does this work? What is a blessed key? The answer is in how Faust models are serialized and deserialized.

When serializing a Faust model we always add a special key, let’s look at the Account object we defined above, and how the payloads are generated:

>>> user = User(
...    id='07ecaebf-48c4-4c9e-92ad-d16d2f4a9a19',
...    first_name='Franz',
...    last_name='Kafka',
... )
>>> account = Account(
...    user=user,
...    balance='12.3',
)
>>> from pprint import pprint
>>> pprint(account.to_representation())
{
    '__faust': {'ns': 't.Account'},
    'balance': Decimal('12.3'),
    'user': {
        '__faust': {'ns': 't.User'},
        'first_name': 'Franz',
        'id': '07ecaebf-48c4-4c9e-92ad-d16d2f4a9a19',
        'last_name': 'Kafka',
    },
}

The blessed key here is the __faust key, it describes what model class was used when serializing it. When we allow the blessed key to be used, we allow it to be reconstructed using that same class.

When you define a module in Python code, Faust will automatically keep an index of model name to class, which we then use to look up a model class by name. For this to work, you must have imported the module where your model is defined before you deserialize the payload.

When using blessed keys it’s extremely important that you do not rename classes, or old data cannot be deserialized.

Reference

Serialization/Deserialization
class faust.Record[source]
classmethod loads(s: bytes, *, default_serializer: Union[faust.types.codecs.CodecT, str, None] = None, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → faust.types.models.ModelT

Deserialize model object from bytes.

Keyword Arguments:
 serializer (CodecArg) – Default serializer to use if no custom serializer was set for this model subclass.
Return type:ModelT
dumps(*, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → bytes

Serialize object to the target serialization format.

Return type:bytes
to_representation() → Mapping[str, Any][source]

Convert object to JSON serializable object.

Return type:Mapping[str, Any]
classmethod from_data(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source]
Return type:Record
derive(*objects, **fields) → faust.types.models.ModelT
Return type:ModelT
_options

Model metadata for introspection. An instance of faust.types.models.ModelOptions.

class faust.ModelOptions[source]
fields = None

Flattened view of __annotations__ in MRO order.

Type:Index
fieldset = None

Set of required field names, for fast argument checking.

Type:Index
fieldpos = None

Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.

Type:Index
optionalset = None

Set of optional field names, for fast argument checking.

Type:Index
models = None

Mapping of fields that are ModelT

Type:Index
decimals = False
isodates = False
coercions = None
defaults = None

Mapping of field names to default value.

Codecs

Supported codecs

  • raw - no encoding/serialization (bytes only).
  • json - json with UTF-8 encoding.
  • pickle - pickle with Base64 encoding (not URL-safe).
  • binary - Base64 encoding (not URL-safe).

Encodings are not URL-safe if the encoded payload cannot be embedded directly into a URL query parameter.

Serialization by name

The dumps() function takes a codec name and the object to encode as arguments, and returns bytes

>>> s = dumps('json', obj)

In reverse direction, the loads() function takes a codec name and an encoded payload to decode (in bytes), as arguments, and returns a reconstruction of the serialized object:

>>> obj = loads('json', s)

When passing in the codec type as a string (as in loads('json', ...) above), you can also combine multiple codecs to form a pipeline, for example "json|gzip" combines JSON serialization with gzip compression:

>>> obj = loads('json|gzip', s)

Codec registry

All codecs have a name and the faust.serializers.codecs attribute maintains a mapping from name to Codec instance.

You can add a new codec to this mapping by executing:

>>> from faust.serializers import codecs
>>> codecs.register(custom, custom_serializer())

To create a new codec, you need to define only two methods: first you need the _loads() method to deserialize bytes, then you need the _dumps() method to serialize an object:

import msgpack

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(obj)

    def _loads(self, s: bytes) -> Any:
        return msgpack.loads(s)

We use msgpack.dumps to serialize, and our codec now encodes to raw msgpack format in binary. We may have to write this payload to somewhere unable to handle binary data well, to solve that we combine the codec with Base64 encoding to convert the binary to text.

Combining codecs is easy using the | operator:

def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

codecs.register('msgpack', msgpack())

At this point, we monkey-patched Faust to support our codec, and we can use it to define records:

>>> from faust import Record
>>> class Point(Record, serializer='msgpack'):
...     x: int
...     y: int

The problem with monkey-patching is that we must make sure the patching happens before we use the feature.

Faust also supports registering codec extensions using setuptools entry-points, so instead, we can create an installable msgpack extension.

To do so, we need to define a package with the following directory layout:

faust-msgpack/
    setup.py
    faust_msgpack.py

The first file (faust-msgpack/setup.py) defines metadata about our package and should look like the following example:

import setuptools

setuptools.setup(
    name='faust-msgpack',
    version='1.0.0',
    description='Faust msgpack serialization support',
    author='Ola A. Normann',
    author_email='ola@normann.no',
    url='http://github.com/example/faust-msgpack',
    platforms=['any'],
    license='BSD',
    packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
    zip_safe=False,
    install_requires=['msgpack-python'],
    tests_require=[],
    entry_points={
        'faust.codecs': [
            'msgpack = faust_msgpack:msgpack',
        ],
    },
)

The most important part being the entry_points key which tells Faust how to load our plugin. We have set the name of our codec to msgpack and the path to the codec class to be faust_msgpack:msgpack. Faust imports this as it would from faust_msgpack import msgpack, so we need to define that part next in our faust-msgpack/faust_msgpack.py module:

from faust.serializers import codecs

class raw_msgpack(codecs.Codec):

    def _dumps(self, obj: Any) -> bytes:
        return msgpack.dumps(s)


def msgpack() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()

That’s it! To install and use our new extension do:

$ python setup.py install

At this point you can publish this to PyPI so it can be shared amongst other Faust users.