Source code for faust.models.base

"""Model descriptions.

The model describes the components of a data structure, kind of like a struct
in C, but there's no limitation of what type of data structure the model is,
or what it's used for.

A record (faust.models.record) is a model type that serialize into
dictionaries, so the model describe the fields, and their types:

.. sourcecode:: python

    >>> class Point(Record):
    ...    x: int
    ...    y: int

    >>> p = Point(10, 3)
    >>> assert p.x == 10
    >>> assert p.y == 3
    >>> p
    <Point: x=10, y=3>
    >>> payload = p.dumps(serializer='json')
    '{"x": 10, "y": 3, "__faust": {"ns": "__main__.Point"}}'
    >>> p2 = Record.loads(payload)
    >>> p2
    <Point: x=10, y=3>

Models are mainly used for describing the data in messages: both keys and
values can be described as models.
"""
import abc
import inspect
from operator import attrgetter
from typing import (
    Any,
    Callable,
    ClassVar,
    Iterable,
    MutableMapping,
    Optional,
    Tuple,
    Type,
)

from mode.utils.objects import canoname

from faust.serializers.codecs import CodecArg, dumps, loads
from faust.types.models import FieldDescriptorT, ModelOptions, ModelT

__all__ = ['Model', 'FieldDescriptor', 'registry']

# NOTES:
# - Records are described in the same notation as named tuples in Python 3.6.
#   To accomplish this ``__init_subclass__`` defined in :pep:`487` is used.
#
#   When accessed on the Record class, the attributes are actually field
#   descriptors that return information about the field:
#       >>> Point.x
#       <FieldDescriptor: Point.x: int>
#
#   This field descriptor holds information about the name of the field, the
#   value type of the field, and also what Record subclass it belongs to.
#
# - Sometimes field descriptions are passed around as arguments to functions.
#
# - A stream of deposits may be joined with a stream of orders if
#   both have an ``account`` field.  Field descriptors are used to
#   specify the field.
#
# - order_instance.account is data
#   (it holds the string account for this particular order).
#
# - order_instance.__class__.account is the field descriptor for the field,
#   it's not data but metadata that enables introspection, and it can be
#   passed around to describe a field we want to extract or similar.
#
# - FieldDescriptors are Python descriptors: In Python object
#   attributes can override what happens when they are get/set/deleted:
#
#       class MyDescriptor:
#
#           def __get__(self, instance, cls):
#               if instance is None:
#                   print('ACCESS ON CLASS ATTRIBUTE')
#                   return self
#               print('ACCESS ON INSTANCE')
#               return 42
#
#       class Example:
#           foo = MyDescriptor()
#
#   The above descriptor overrides __get__, which is called when the attribute
#   is accessed (a descriptor may also override __set__ and __del__).
#
#
#   You can see the difference in what happens when you access the attribute
#   on the class, vs. the instance:

#       >>> Example.foo
#       ACCESS ON CLASS ATTRIBUTE
#       <__main__.MyDescriptor at 0x1049caac8>
#
#       >>> x = Example()
#       >>> x.foo
#       ACCESS ON INSTANCE
#       42

#: Global map of namespace -> Model, used to find model classes by name.
#: Every single model defined is added here automatically when a model
#: class is defined.
registry: MutableMapping[str, Type[ModelT]] = {}


[docs]class Model(ModelT): """Meta description model for serialization.""" #: Set to True if this is an abstract base class. __is_abstract__: ClassVar[bool] = True #: Serialized data may contain a "blessed key" that mandates #: how the data should be deserialized. This probably only #: applies to records, but we need to support it at Model level. #: The blessed key has a dictionary value with a ``ns`` key: #: data = {.., '__faust': {'ns': 'examples.simple.Withdrawal'}} #: When ``Model._maybe_reconstruct` sees this key it will look #: up that namespace in the :data:`registry`, and if it exists #: select it as the target model to use for serialization. #: #: Is this similar to how unsafe deserialization in pickle/yaml/etc. #: works? No! pickle/pyyaml allow for arbitrary types to be #: deserialized (and worse in pickle's case), whereas the blessed #: key can only deserialize to a hardcoded list of types that are #: already under the remote control of messages anyway. #: For example it's not possible to perform remote code execution #: by providing a blessed key namespace of "os.system", simply #: because os.system is not in the registry of allowed types. _blessed_key = '__faust' @classmethod def _maybe_namespace( cls, data: Any, *, preferred_type: Type[ModelT] = None, fast_types: Tuple[Type, ...] = (bytes, str), isinstance: Callable = isinstance) -> Optional[Type[ModelT]]: # The serialized data may contain a ``__faust`` blessed key # holding the name of the model it should be deserialized as. # So even if value_type=MyModel, the data may mandata that it # should be deserialized using "foo.bar.baz" instead. # This is how we deal with Kafka's lack of message headers, # as needed by the RPC mechanism, without wrapping all data. if data is None or isinstance(data, fast_types): return None try: ns = data[cls._blessed_key]['ns'] except (KeyError, TypeError): pass else: # we only allow blessed keys when type=None, or type=Model type_is_abstract = (preferred_type is None or preferred_type is ModelT or preferred_type is Model) try: model = registry[ns] except KeyError: if type_is_abstract: raise return None else: if type_is_abstract or model._options.allow_blessed_key: return model return None @classmethod def _maybe_reconstruct(cls, data: Any) -> Any: model = cls._maybe_namespace(data) return model.from_data(data) if model else data
[docs] @classmethod def loads(cls, s: bytes, *, default_serializer: CodecArg = None) -> ModelT: """Deserialize model object from bytes. Arguments: default_serializer (CodecArg): Default serializer to use if no custom serializer was set for this model subclass. **kwargs: Additional attributes to set on the model object. Note, these are regarded as defaults, and any fields also present in the message takes precedence. """ data = loads(cls._options.serializer or default_serializer, s) return cls.from_data(data)
def __init_subclass__(cls, serializer: str = None, namespace: str = None, include_metadata: bool = None, isodates: bool = None, abstract: bool = False, allow_blessed_key: bool = None, **kwargs: Any) -> None: # Python 3.6 added the new __init_subclass__ function that # makes it possible to initialize subclasses without using # metaclasses (:pep:`487`). super().__init_subclass__(**kwargs) # mypy does not recognize `__init_subclass__` as a classmethod # and thinks we're mutating a ClassVar when setting: # cls.__is_abstract__ = False # To fix this we simply delegate to a _init_subclass classmethod. cls._init_subclass( serializer, namespace, include_metadata, isodates, abstract, allow_blessed_key, ) @classmethod def _init_subclass(cls, serializer: str = None, namespace: str = None, include_metadata: bool = None, isodates: bool = None, abstract: bool = False, allow_blessed_key: bool = None) -> None: if abstract: # Custom base classes can set this to skip class initialization. cls.__is_abstract__ = True return cls.__is_abstract__ = False # Can set serializer/namespace/etc. using: # class X(Record, serializer='json', namespace='com.vandelay.X'): # ... try: custom_options = cls.Options except AttributeError: custom_options = None else: delattr(cls, 'Options') options = getattr(cls, '_options', None) if options is None: options = ModelOptions() else: options = options.clone_defaults() if custom_options: options.__dict__.update(custom_options.__dict__) if serializer is not None: options.serializer = serializer if include_metadata is not None: options.include_metadata = include_metadata if isodates is not None: options.isodates = isodates if allow_blessed_key is not None: options.allow_blessed_key = allow_blessed_key options.namespace = namespace or canoname(cls) # Add introspection capabilities cls._contribute_to_options(options) # Add FieldDescriptors for every field. cls._contribute_field_descriptors(cls, options) # Store options on new subclass. cls._options = options cls._contribute_methods() # Register in the global registry, so we can look up # models by namespace. registry[options.namespace] = cls cls._model_init = cls._BUILD_init() if '__init__' not in cls.__dict__: cls.__init__ = cls._model_init @classmethod @abc.abstractmethod def _contribute_to_options( cls, options: ModelOptions) -> None: # pragma: no cover ... @classmethod def _contribute_methods(cls) -> None: # pragma: no cover ... @classmethod @abc.abstractmethod def _contribute_field_descriptors( cls, target: Type, options: ModelOptions, parent: FieldDescriptorT = None) -> None: # pragma: no cover ... @classmethod @abc.abstractmethod def _BUILD_init(cls) -> Callable[[], None]: # pragma: no cover ...
[docs] @abc.abstractmethod def to_representation(self) -> Any: # pragma: no cover """Convert object to JSON serializable object."""
@abc.abstractmethod def _humanize(self) -> str: # pragma: no cover """Return string representation of object for debugging purposes.""" ...
[docs] def derive(self, *objects: ModelT, **fields: Any) -> ModelT: return self._derive(*objects, **fields)
@abc.abstractmethod # pragma: no cover def _derive(self, *objects: ModelT, **fields: Any) -> ModelT: raise NotImplementedError()
[docs] def dumps(self, *, serializer: CodecArg = None) -> bytes: """Serialize object to the target serialization format.""" return dumps(serializer or self._options.serializer, self.to_representation())
def __repr__(self) -> str: return f'<{type(self).__name__}: {self._humanize()}>'
def _is_concrete_model(typ: Type = None) -> bool: return (typ is not None and inspect.isclass(typ) and issubclass(typ, ModelT) and typ is not ModelT and not getattr(typ, '__is_abstract__', False))
[docs]class FieldDescriptor(FieldDescriptorT): """Describes a field. Used for every field in Record so that they can be used in join's /group_by etc. Examples: >>> class Withdrawal(Record): ... account_id: str ... amount: float = 0.0 >>> Withdrawal.account_id <FieldDescriptor: Withdrawal.account_id: str> >>> Withdrawal.amount <FieldDescriptor: Withdrawal.amount: float = 0.0> Arguments: field (str): Name of field. type (Type): Field value type. model (Type): Model class the field belongs to. required (bool): Set to false if field is optional. default (Any): Default value when `required=False`. """ #: Name of attribute on Model. field: str #: Type of value (e.g. ``int``, or ``Optional[int]``)). type: Type #: The model class this field is associated with. model: Type[ModelT] #: Set if a value for this field is required (cannot be :const:`None`). required: bool = True #: Default value for non-required field. default: Any = None # noqa: E704 def __init__(self, field: str, type: Type, model: Type[ModelT], required: bool = True, default: Any = None, parent: FieldDescriptorT = None) -> None: self.field = field self.type = type self.model = model self.required = required self.default = default self.parent = parent self._copy_descriptors(self.type) def _copy_descriptors(self, typ: Type = None) -> None: if typ is not None and _is_concrete_model(typ): typ._contribute_field_descriptors(self, typ._options, parent=self) def __get__(self, instance: Any, owner: Type) -> Any: # class attribute accessed if instance is None: return self # instance attribute accessed return instance.__dict__[self.field]
[docs] def getattr(self, obj: ModelT) -> Any: return attrgetter('.'.join(reversed(list(self._parents_path()))))(obj)
def _parents_path(self) -> Iterable[str]: node: Optional[FieldDescriptorT] = self while node: yield node.field node = node.parent def __set__(self, instance: Any, value: Any) -> None: instance.__dict__[self.field] = value def __repr__(self) -> str: default = '' if self.required else f' = {self.default!r}' typ = self.type.__name__ return f'<{type(self).__name__}: {self.ident}: {typ}{default}>' @property def ident(self) -> str: return f'{self.model.__name__}.{self.field}'