Source code for faust.serializers.codecs

"""Serialization utilities.

Supported codecs
================

* **raw**     - No encoding/serialization (bytes only).
* **json**    - json with UTF-8 encoding.
* **yaml**    - YAML (safe version)
* **pickle**  - pickle with base64 encoding (not urlsafe).
* **binary**  - base64 encoding (not urlsafe).

Serialization by name
=====================

The :func:`dumps` function takes a codec name and the object to encode,
then returns bytes:

.. sourcecode:: pycon

    >>> s = dumps('json', obj)

For the reverse direction, the :func:`loads` function takes a codec
name and bytes to decode:

.. sourcecode:: pycon

    >>> obj = loads('json', s)

You can also combine encoders in the name, like in this case
where json is combined with gzip compression:

.. sourcecode:: pycon

    >>> obj = loads('json|gzip', s)

Codec registry
==============

Codecs are configured by name and this module maintains
a mapping from name to :class:`Codec` instance: the :attr:`codecs`
attribute.

You can add a new codec to this mapping by:

.. sourcecode:: pycon

    >>> from faust.serializers import codecs
    >>> codecs.register(custom, custom_serializer())

A codec subclass requires two methods to be implemented: ``_loads()``
and ``_dumps()``:

.. sourcecode:: python

    import msgpack

    from faust.serializers import codecs

    class raw_msgpack(codecs.Codec):

        def _dumps(self, obj: Any) -> bytes:
            return msgpack.dumps(obj)

        def _loads(self, s: bytes) -> Any:
            return msgpack.loads(s)

Our codec now encodes/decodes to raw msgpack format, but we
may also need to transfer this payload over a transport easily confused
by binary data, such as JSON where everything is Unicode.

You can chain codecs together, so to add a binary text encoding like Base64,
to your codec, we use the ``|`` operator to form a combined codec:

.. sourcecode:: python

    def msgpack() -> codecs.Codec:
        return raw_msgpack() | codecs.binary()

    codecs.register('msgpack', msgpack())

At this point we monkey-patched Faust to support
our codec, and we can use it to define records like this:

.. sourcecode:: pycon

    >>> from faust.serializers import Record
    >>> class Point(Record, serializer='msgpack'):
    ...     x: int
    ...     y: int

The problem with monkey-patching is that we must make sure the patching
happens before we use the feature.

Faust also supports registering *codec extensions*
using setuptools entry points, so instead we can create an installable msgpack
extension.

To do so we need to define a package with the following directory layout:

.. sourcecode:: text

    faust-msgpack/
        setup.py
        faust_msgpack.py

The first file, :file:`faust-msgpack/setup.py`, defines metadata about our
package and should look like the following example:

.. sourcecode:: python

    from setuptools import setup, find_packages

    setup(
        name='faust-msgpack',
        version='1.0.0',
        description='Faust msgpack serialization support',
        author='Ola A. Normann',
        author_email='ola@normann.no',
        url='http://github.com/example/faust-msgpack',
        platforms=['any'],
        license='BSD',
        packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
        zip_safe=False,
        install_requires=['msgpack-python'],
        tests_require=[],
        entry_points={
            'faust.codecs': [
                'msgpack = faust_msgpack:msgpack',
            ],
        },
    )

The most important part being the ``entry_points`` key which tells
Faust how to load our plugin. We have set the name of our
codec to ``msgpack`` and the path to the codec class
to be ``faust_msgpack:msgpack``. This will be imported by Faust
as ``from faust_msgpack import msgpack``, so we need to define
that part next in our :file:`faust-msgpack/faust_msgpack.py` module:

.. sourcecode:: python

    from faust.serializers import codecs

    class raw_msgpack(codecs.Codec):

        def _dumps(self, obj: Any) -> bytes:
            return msgpack.dumps(s)


    def msgpack() -> codecs.Codec:
        return raw_msgpack() | codecs.binary()

That's it! To install and use our new extension we do:

.. sourcecode:: console

    $ python setup.py install

At this point may want to publish this on PyPI to share
the extension with other Faust users.
"""
import pickle as _pickle

from base64 import b64decode, b64encode
from types import ModuleType
from typing import Any, Dict, MutableMapping, Optional, Tuple, cast

from mode.utils.compat import want_bytes, want_str
from mode.utils.imports import load_extension_classes

from faust.exceptions import ImproperlyConfigured
from faust.types.codecs import CodecArg, CodecT
from faust.utils import json as _json

try:
    import yaml as _yaml
except ImportError:  # pragma: no cover
    _yaml = cast(ModuleType, None)  # noqa


__all__ = [
    'Codec',
    'CodecArg',
    'register',
    'get_codec',
    'dumps',
    'loads',
]


[docs]class Codec(CodecT): """Base class for codecs.""" #: next steps in the recursive codec chain. #: ``x = pickle | binary`` returns codec with #: children set to ``(pickle, binary)``. children: Tuple[CodecT, ...] #: cached version of children including this codec as the first node. #: could use chain below, but seems premature so just copying the list. nodes: Tuple[CodecT, ...] #: subclasses can support keyword arguments, #: the base implementation of :meth:`clone` uses this to #: preserve keyword arguments in copies. kwargs: Dict def __init__(self, children: Tuple[CodecT, ...] = None, **kwargs: Any) -> None: self.children = children or () self.nodes = (self,) + self.children self.kwargs = kwargs def _loads(self, s: bytes) -> Any: # subclasses must implement this method. raise NotImplementedError() def _dumps(self, s: Any) -> bytes: # subclasses must implement this method. raise NotImplementedError()
[docs] def dumps(self, obj: Any) -> bytes: """Encode object ``obj``.""" # send _dumps to this instance, and all children. for node in self.nodes: obj = cast(Codec, node)._dumps(obj) return obj
[docs] def loads(self, s: bytes) -> Any: """Decode object from string.""" # send _loads to this instance, and all children in reverse order for node in reversed(self.nodes): s = cast(Codec, node)._loads(s) return s
[docs] def clone(self, *children: CodecT) -> CodecT: """Create a clone of this codec, with optional children added.""" new_children = self.children + children return type(self)(children=new_children, **self.kwargs)
def __or__(self, other: Any) -> Any: # codecs can be chained together, e.g. binary() | json() if isinstance(other, CodecT): return self.clone(other) return NotImplemented def __repr__(self) -> str: return ' | '.join('{0}({1})'.format( type(n).__name__, ', '.join( map(repr, cast(Codec, n).kwargs.values()))) for n in self.nodes)
class json(Codec): """:mod:`json` serializer.""" def _loads(self, s: bytes) -> Any: return _json.loads(want_str(s)) def _dumps(self, s: Any) -> bytes: return want_bytes(_json.dumps(s)) class yaml(Codec): """:pypi:`PyYAML` serializer.""" def _loads(self, s: bytes) -> Any: if _yaml is None: raise ImproperlyConfigured('Missing yaml: pip install PyYAML') return _yaml.safe_load(want_str(s)) def _dumps(self, s: Any) -> bytes: if _yaml is None: raise ImproperlyConfigured('Missing yaml: pip install PyYAML') return want_bytes(_yaml.safe_dump(s)) class raw_pickle(Codec): """:mod:`pickle` serializer with no encoding.""" def _loads(self, s: bytes) -> Any: return _pickle.loads(s) def _dumps(self, obj: Any) -> bytes: return _pickle.dumps(obj) def pickle() -> Codec: """:mod:`pickle` serializer with base64 encoding.""" return raw_pickle() | binary() class binary(Codec): """Codec for binary content (uses Base64 encoding).""" def _loads(self, s: bytes) -> Any: return b64decode(s) def _dumps(self, s: bytes) -> bytes: return b64encode(want_bytes(s)) class raw(Codec): """Codec that does nothing at all.""" def _loads(self, s: bytes) -> bytes: return want_bytes(s) def _dumps(self, s: bytes) -> bytes: return want_bytes(s) #: Codec registry, mapping of name to :class:`Codec` instance. codecs: MutableMapping[str, CodecT] = { 'json': json(), 'pickle': pickle(), 'binary': binary(), 'raw': raw(), 'yaml': yaml(), } #: Cached extension classes. #: We have to defer extension loading to runtime as the #: extensions will import from this module causing a circular import. _extensions_finalized: MutableMapping[str, bool] = {}
[docs]def register(name: str, codec: CodecT) -> None: """Register new codec in the codec registry.""" codecs[name] = codec
def _maybe_load_extension_classes( namespace: str = 'faust.codecs') -> None: if namespace not in _extensions_finalized: _extensions_finalized[namespace] = True codecs.update({ name: cls() for name, cls in load_extension_classes(namespace) })
[docs]def get_codec(name_or_codec: CodecArg) -> CodecT: """Get codec by name.""" _maybe_load_extension_classes() if isinstance(name_or_codec, str): if '|' in name_or_codec: nodes = name_or_codec.split('|') codec = None for node in nodes: if codec: codec |= codecs[node] else: codec = codecs.get(node, node) return cast(Codec, codec) return codecs[name_or_codec] return cast(Codec, name_or_codec)
[docs]def dumps(codec: Optional[CodecArg], obj: Any) -> bytes: """Encode object into bytes.""" return get_codec(codec).dumps(obj) if codec else obj
[docs]def loads(codec: Optional[CodecArg], s: bytes) -> Any: """Decode object from bytes.""" return get_codec(codec).loads(s) if codec else s