Source code for faust.windows

"""Window Types."""
from typing import List
from mode import Seconds, want_seconds
from .types.windows import WindowRange, WindowRange_from_start, WindowT

__all__ = [
    'Window',
    'HoppingWindow',
    'TumblingWindow',
    'SlidingWindow',
]


[docs]class Window(WindowT): ...
[docs]class HoppingWindow(Window): """Hopping window type. Fixed-size, overlapping windows. """ size: float step: float def __init__(self, size: Seconds, step: Seconds, expires: Seconds = None) -> None: self.size = want_seconds(size) self.step = want_seconds(step) self.expires = want_seconds(expires) if expires else None
[docs] def ranges(self, timestamp: float) -> List[WindowRange]: start = self._start_initial_range(timestamp) return [ WindowRange_from_start(float(start), self.size) for start in range(int(start), int(timestamp) + 1, int(self.step)) ]
[docs] def stale(self, timestamp: float, latest_timestamp: float) -> bool: return (timestamp <= self._stale_before(latest_timestamp, self.expires) if self.expires else False)
[docs] def current(self, timestamp: float) -> WindowRange: """ The current WindowRange is the latest WindowRange for a given timestamp """ return self.ranges(timestamp)[-1]
[docs] def delta(self, timestamp: float, d: Seconds) -> WindowRange: return self.current(timestamp - want_seconds(d))
[docs] def earliest(self, timestamp: float) -> WindowRange: return self.ranges(timestamp)[0]
def _start_initial_range(self, timestamp: float) -> float: closest_step = (timestamp // self.step) * self.step return closest_step - self.size + self.step def _stale_before(self, latest_timestamp: float, expires: float) -> float: return self.current(latest_timestamp - expires).start
[docs]class TumblingWindow(HoppingWindow): """Tumbling window type. Fixed-size, non-overlapping, gap-less windows. """ def __init__(self, size: Seconds, expires: Seconds = None) -> None: super(TumblingWindow, self).__init__(size, size, expires)
[docs]class SlidingWindow(Window): """Sliding window type. Fixed-size, overlapping windows that work on differences between record timestamps """ before: float after: float def __init__(self, before: Seconds, after: Seconds, expires: Seconds) -> None: self.before = want_seconds(before) self.after = want_seconds(after) self.expires = want_seconds(expires)
[docs] def ranges(self, timestamp: float) -> List[WindowRange]: """Return list of windows from timestamp. Notes: .. sourcecode:: sql SELECT * FROM s1, s2 WHERE s1.key = s2.key AND s1.ts - before <= s2.ts AND s2.ts <= s1.ts + after """ return [ WindowRange( start=timestamp - self.before, end=timestamp + self.after), ]
[docs] def stale(self, timestamp: float, latest_timestamp: float) -> bool: return (timestamp <= self._stale_before(self.expires, latest_timestamp) if self.expires else False)
def _stale_before(self, expires: float, latest_timestamp: float) -> float: return latest_timestamp - expires