Source code for faust.models.record

"""Record - Dictionary Model."""
from collections import deque
from datetime import datetime
from decimal import Decimal
from functools import partial
from typing import (
    Any,
    Callable,
    Dict,
    FrozenSet,
    Iterable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    Type,
    cast,
)

from mode.utils.objects import (
    annotations,
    guess_polymorphic_type,
    is_optional,
    remove_optional,
)
from mode.utils.text import pluralize

from faust.types.models import (
    CoercionHandler,
    FieldDescriptorT,
    FieldMap,
    IsInstanceArgT,
    ModelOptions,
    ModelT,
    TypeCoerce,
    TypeInfo,
)
from faust.utils import codegen
from faust.utils import iso8601
from faust.utils.json import str_to_decimal

from .base import Model
from .fields import FieldDescriptor, field_for_type

__all__ = ['Record']

DATE_TYPES: IsInstanceArgT = (datetime,)
DECIMAL_TYPES: IsInstanceArgT = (Decimal,)

ALIAS_FIELD_TYPES = {
    dict: Dict,
    tuple: Tuple,
    list: List,
    set: Set,
    frozenset: FrozenSet,
}

E_NON_DEFAULT_FOLLOWS_DEFAULT = '''
Non-default {cls_name} field {field_name} cannot
follow default {fields} {default_names}
'''

_ReconFun = Callable[..., Any]

# Models can refer to other models:
#
#   class M(Model):
#     x: OtherModel
#
# but can also have List-of-X, Mapping-of-X, etc:
#
#  class M(Model):
#    x: List[OtherModel]
#    y: Mapping[KeyModel, ValueModel]
#
# in the source code we refer to a polymorphic type, in the example above
# the polymorphic type for x would be `list`, and the polymorphic type
# for y would be `dict`.

__polymorphic_type_cache: Dict[Type, Tuple[Type, Type]] = {}


def _polymorphic_type(typ: Type) -> Tuple[Type, Type]:
    try:
        polymorphic_type, cls = __polymorphic_type_cache[typ]
    except KeyError:
        try:
            val = guess_polymorphic_type(typ)
        except TypeError:
            val = (TypeError, None)
        __polymorphic_type_cache[typ] = val
        return val
    if polymorphic_type is TypeError:
        raise TypeError()
    return polymorphic_type, cls


def _is_model(cls: Type) -> Tuple[bool, Type, Optional[Type]]:
    # Returns (is_model, polymorphic_type).
    # polymorphic type (if available) will be list if it's a list,
    # dict if dict, etc, then that means it's a List[ModelType],
    # Dict[ModelType] etc, so
    # we have to deserialize them as such.
    polymorphic_type = None
    try:
        polymorphic_type, cls = guess_polymorphic_type(cls)
    except TypeError:
        pass
    member_type = remove_optional(cls)
    try:
        return issubclass(member_type, ModelT), member_type, polymorphic_type
    except TypeError:  # typing.Any cannot be used with subclass
        return False, cls, None


def _field_callback(typ: Type, callback: _ReconFun, **kwargs: Any) -> Any:
    try:
        generic, subtyp = _polymorphic_type(typ)
    except TypeError:
        pass
    else:
        if generic is list:
            return partial(_from_generic_list, subtyp, callback, **kwargs)
        elif generic is tuple:
            return partial(_from_generic_tuple, subtyp, callback, **kwargs)
        elif generic is dict:
            return partial(_from_generic_dict, subtyp, callback, **kwargs)
        elif generic is set:
            return partial(_from_generic_set, subtyp, callback, **kwargs)
    return partial(callback, typ, **kwargs)


def _from_generic_list(typ: Type,
                       callback: _ReconFun,
                       data: Iterable,
                       **kwargs: Any) -> List:
    return [callback(typ, v, **kwargs) for v in data]


def _from_generic_tuple(typ: Type,
                        callback: _ReconFun,
                        data: Tuple,
                        **kwargs: Any) -> Tuple:
    return tuple(callback(typ, v, **kwargs) for v in data)


def _from_generic_dict(typ: Type,
                       callback: _ReconFun,
                       data: Mapping,
                       **kwargs: Any) -> Mapping:
    return {k: callback(typ, v, **kwargs) for k, v in data.items()}


def _from_generic_set(typ: Type,
                      callback: _ReconFun,
                      data: Set,
                      **kwargs: Any) -> Set:
    return {callback(typ, v, **kwargs) for v in data}


def _to_model(typ: Type[ModelT], data: Any, **kwargs: Any) -> Optional[ModelT]:
    # called everytime something needs to be converted into a model.
    typ = remove_optional(typ)
    if data is not None and not isinstance(data, typ):
        model = typ.from_data(data, preferred_type=typ)
        return model if model is not None else data
    return data


def _using_descriptor(typ: Type, data: Any, *,
                      descr: FieldDescriptorT,
                      **kwargs: Any) -> Any:
    return descr.prepare_value(data)


def _maybe_to_representation(val: ModelT = None) -> Optional[Any]:
    return val.to_representation() if val is not None else None


def _is_of_type(value: Any, typ: Type) -> bool:
    if is_optional(typ):
        # Optional/Union can contain nested types, e.g:
        #    Optional[Union[str, int, Optional[Foo]]]
        #
        # to avoid recursion we add new Union types to a stack
        # and return False only when that stack is exhausted.
        #
        # NOTE: Optional[str] actually returns Union[str, type(None)]
        stack = deque([typ])
        while stack:
            node = stack.popleft()
            concrete_types = []
            for subtype in node.__args__:
                if is_optional(subtype):
                    stack.append(subtype)
                else:
                    concrete_types.append(subtype)
            if isinstance(value, tuple(concrete_types)):
                return True
        return False
    else:
        return isinstance(value, typ)


[docs]class Record(Model, abstract=True): """Describes a model type that is a record (Mapping). Examples: >>> class LogEvent(Record, serializer='json'): ... severity: str ... message: str ... timestamp: float ... optional_field: str = 'default value' >>> event = LogEvent( ... severity='error', ... message='Broken pact', ... timestamp=666.0, ... ) >>> event.severity 'error' >>> serialized = event.dumps() '{"severity": "error", "message": "Broken pact", "timestamp": 666.0}' >>> restored = LogEvent.loads(serialized) <LogEvent: severity='error', message='Broken pact', timestamp=666.0> >>> # You can also subclass a Record to create a new record >>> # with additional fields >>> class RemoteLogEvent(LogEvent): ... url: str >>> # You can also refer to record fields and pass them around: >>> LogEvent.severity >>> <FieldDescriptor: LogEvent.severity (str)> """ @classmethod def _contribute_to_options(cls, options: ModelOptions) -> None: # Find attributes and their types, and create indexes for these. # This only happens once when the class is created, so Faust # models are fast at runtime. fields, defaults = annotations( cls, stop=Record, skip_classvar=True, alias_types=ALIAS_FIELD_TYPES, localns={cls.__name__: cls}, ) options.fields = cast(Mapping, fields) options.fieldset = frozenset(fields) options.fieldpos = {i: k for i, k in enumerate(fields.keys())} # extract all default values, but only for actual fields. options.defaults = { k: v.default if isinstance(v, FieldDescriptor) else v for k, v in defaults.items() if k in fields and not ( isinstance(v, FieldDescriptor) and v.required) } options.models = {} options.polyindex = {} modelattrs = options.modelattrs = {} def _is_concrete_type(field: str, wanted: IsInstanceArgT) -> bool: typeinfo = options.polyindex[field] try: return issubclass(typeinfo.member_type, wanted) except TypeError: return False # Raise error if non-defaults are mixed in with defaults # like namedtuple/dataclasses do. local_defaults = [] for attr_name in cls.__annotations__: if attr_name in cls.__dict__: default_value = cls.__dict__[attr_name] if isinstance(default_value, FieldDescriptorT): if not default_value.required: local_defaults.append(attr_name) else: local_defaults.append(attr_name) else: if local_defaults: raise TypeError(E_NON_DEFAULT_FOLLOWS_DEFAULT.format( cls_name=cls.__name__, field_name=attr_name, fields=pluralize(len(local_defaults), 'field'), default_names=', '.join(local_defaults), )) for field, typ in fields.items(): is_model, member_type, generic_type = _is_model(typ) options.polyindex[field] = TypeInfo(generic_type, member_type) if is_model: # Extract all model fields options.models[field] = typ # Create mapping of model fields to polymorphic types if # available modelattrs[field] = generic_type if is_optional(typ): # Optional[X] also needs to be added to defaults mapping. options.defaults.setdefault(field, None) # Create frozenset index of default fields. options.optionalset = frozenset(options.defaults) # extract all fields that we want to coerce to a different type # (decimals=True, isodates=True, coercions={MyClass: converter}) # Then move them to options.field_coerce, which is what the # model.__init__ method uses to coerce any fields that need to # be coerced. options.field_coerce = {} if options.isodates: options.coercions.setdefault(DATE_TYPES, iso8601.parse) if options.decimals: options.coercions.setdefault(DECIMAL_TYPES, str_to_decimal) for coerce_types, coerce_handler in options.coercions.items(): options.field_coerce.update({ field: TypeCoerce(typ, coerce_handler) for field, typ in fields.items() if (field not in modelattrs and _is_concrete_type(field, coerce_types)) }) @classmethod def _contribute_methods(cls) -> None: if not getattr(cls.asdict, 'faust_generated', False): raise RuntimeError('Not allowed to override Record.asdict()') cls.asdict = cls._BUILD_asdict() # type: ignore cls.asdict.faust_generated = True # type: ignore cls._input_translate_fields = \ cls._BUILD_input_translate_fields() @staticmethod def _init_maybe_coerce(coerce: CoercionHandler, typ: Type, value: Any) -> Any: if value is None: return None if _is_of_type(value, typ): return value return coerce(value) @classmethod def _contribute_field_descriptors( cls, target: Type, options: ModelOptions, parent: FieldDescriptorT = None) -> FieldMap: fields = options.fields defaults = options.defaults date_parser = options.date_parser coerce = options.coerce index = {} for field, typ in fields.items(): try: default, needed = defaults[field], False except KeyError: default, needed = None, True descr = getattr(target, field, None) typeinfo = options.polyindex[field] if descr is None or not isinstance(descr, FieldDescriptorT): DescriptorType = field_for_type(typeinfo.member_type) descr = DescriptorType( field=field, type=typ, model=cls, required=needed, default=default, parent=parent, coerce=coerce, generic_type=typeinfo.generic_type, member_type=typeinfo.member_type, date_parser=date_parser, ) else: descr = descr.clone( field=field, type=typ, model=cls, required=needed, default=default, parent=parent, coerce=coerce, generic_type=typeinfo.generic_type, member_type=typeinfo.member_type, ) setattr(target, field, descr) index[field] = descr return index
[docs] @classmethod def from_data(cls, data: Mapping, *, preferred_type: Type[ModelT] = None) -> 'Record': """Create model object from Python dictionary.""" # check for blessed key to see if another model should be used. if hasattr(data, '__is_model__'): return cast(Record, data) else: self_cls = cls._maybe_namespace( data, preferred_type=preferred_type) cls._input_translate_fields(data) return (self_cls or cls)(**data, __strict__=False)
def __init__(self, *args: Any, __strict__: bool = True, __faust: Any = None, **kwargs: Any) -> None: # pragma: no cover ... # overridden by _BUILD_init @classmethod def _BUILD_input_translate_fields(cls) -> Callable[[MutableMapping], None]: translate = [ f'data[{field!r}] = data.pop({d.input_name!r}, None)' for field, d in cls._options.descriptors.items() if d.field != d.input_name ] return cast(Callable, classmethod(codegen.Function( '_input_translate_fields', ['cls', 'data'], translate if translate else ['pass'], globals=globals(), locals=locals(), ))) @classmethod def _BUILD_init(cls) -> Callable[[], None]: kwonlyargs = ['*', '__strict__=True', '__faust=None', '**kwargs'] options = cls._options fields = options.fields field_positions = options.fieldpos optional = options.optionalset needs_validation = options.validation models = options.models field_coerce = options.field_coerce initfield = options.initfield = {} descriptors = options.descriptors has_post_init = hasattr(cls, '__post_init__') required = [] opts = [] setters = [] for field in field_positions.values(): model = models.get(field) coerce = field_coerce.get(field) fieldval = f'{field}' if model is not None: initfield[field] = _field_callback(model, _to_model) assert initfield[field] is not None fieldval = f'self._init_field("{field}", {field})' else: field_type = fields[field] descr = options.descriptors[field] initfield[field] = _field_callback( field_type, _using_descriptor, descr=descr) fieldval = f'self._init_field("{field}", {field})' if coerce is not None: coerce_type, coerce_handler = coerce # Model reconstruction require two-arguments: typ, val. # Regular coercion callbacks just takes one argument, so # need to create an intermediate function to fix that. initfield[field] = _field_callback( coerce_type, partial(cls._init_maybe_coerce, coerce_handler)) assert initfield[field] is not None fieldval = f'self._init_field("{field}", {field})' if field in optional: opts.append(f'{field}=None') setters.extend([ f'if {field} is not None:', f' self.{field} = {fieldval}', f'else:', f' self.{field} = self._options.defaults["{field}"]', ]) else: required.append(field) setters.append(f'self.{field} = {fieldval}') rest = [ 'if kwargs and __strict__:', ' from mode.utils.text import pluralize', ' message = "{} got unexpected {}: {}".format(', ' self.__class__.__name__,', ' pluralize(kwargs.__len__(), "argument"),', ' ", ".join(map(str, sorted(kwargs))))', ' raise TypeError(message)', 'self.__dict__.update(kwargs)', ] if has_post_init: rest.extend([ 'self.__post_init__()', ]) if needs_validation: rest.extend([ 'self.validate_or_raise()', ]) return codegen.InitMethod( required + opts + kwonlyargs, setters + rest, globals=globals(), locals=locals(), ) @classmethod def _BUILD_hash(cls) -> Callable[[], None]: return codegen.HashMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_eq(cls) -> Callable[[], None]: return codegen.EqMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_ne(cls) -> Callable[[], None]: return codegen.NeMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_gt(cls) -> Callable[[], None]: return codegen.GtMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_ge(cls) -> Callable[[], None]: return codegen.GeMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_lt(cls) -> Callable[[], None]: return codegen.LtMethod(list(cls._options.fields), globals=globals(), locals=locals()) @classmethod def _BUILD_le(cls) -> Callable[[], None]: return codegen.LeMethod(list(cls._options.fields), globals=globals(), locals=locals()) def _init_field(self, field: str, value: Any) -> Any: options = self._options initfun = options.initfield.get(field) if initfun: value = initfun(value) return value @classmethod def _BUILD_asdict(cls) -> Callable[..., Dict[str, Any]]: preamble = [ 'return self._prepare_dict({', ] fields = [ f' {d.output_name!r}: {cls._BUILD_asdict_field(name, d)},' for name, d in cls._options.descriptors.items() if not d.exclude ] postamble = [ '})', ] return codegen.Method( '_asdict', [], preamble + fields + postamble, globals=globals(), locals=locals(), ) def _prepare_dict(self, payload: Dict[str, Any]) -> Dict[str, Any]: return payload @classmethod def _BUILD_asdict_field(cls, name: str, field: FieldDescriptorT) -> str: modelattrs = cls._options.modelattrs is_model = name in modelattrs if is_model: generic = modelattrs[name] if generic is list or generic is tuple: return (f'[v.to_representation() for v in self.{name}] ' f'if self.{name} is not None else None') elif generic is set: return f'self.{name}' elif generic is dict: return (f'{{k: v.to_representation() ' f' for k, v in self.{name}.items()}}') else: return f'_maybe_to_representation(self.{name})' else: return f'self.{name}' def _derive(self, *objects: ModelT, **fields: Any) -> ModelT: data = self.asdict() for obj in objects: data.update(cast(Record, obj).asdict()) return type(self)(**{**data, **fields})
[docs] def to_representation(self) -> Mapping[str, Any]: """Convert model to its Python generic counterpart. Records will be converted to dictionary. """ # Convert known fields to mapping of ``{field: value}``. payload = self.asdict() if self._options.include_metadata: payload['__faust'] = {'ns': self._options.namespace} return payload
[docs] def asdict(self) -> Dict[str, Any]: # pragma: no cover """Convert record to Python dictionary.""" ... # generated by _BUILD_asdict
# Used to disallow overriding this method asdict.faust_generated = True # type: ignore def _humanize(self) -> str: # we try to preserve the order of fields specified in the class, # so doing {**self._options.defaults, **self.__dict__} does not work. attrs, defaults = self.__dict__, self._options.defaults.items() fields = { **attrs, **{k: v for k, v in defaults if k not in attrs}, } return _kvrepr(fields) def __json__(self) -> Any: return self.to_representation() def __eq__(self, other: Any) -> bool: # pragma: no cover # implemented by BUILD_eq return NotImplemented def __ne__(self, other: Any) -> bool: # pragma: no cover # implemented by BUILD_ne return NotImplemented def __lt__(self, other: 'Record') -> bool: # pragma: no cover # implemented by BUILD_lt return NotImplemented def __le__(self, other: 'Record') -> bool: # pragma: no cover # implemented by BUILD_le return NotImplemented def __gt__(self, other: 'Record') -> bool: # pragma: no cover # implemented by BUILD_gt return NotImplemented def __ge__(self, other: 'Record') -> bool: # pragma: no cover # implemented by BUILD_ge return NotImplemented
def _kvrepr(d: Mapping[str, Any], *, sep: str = ', ') -> str: """Represent dict as `k='v'` pairs separated by comma.""" return sep.join(f'{k}={v!r}' for k, v in d.items())