Overview: Faust vs Kafka Streams

KStream

  • .filter()

  • .filterNot()

    Just use the if statement:

    @app.agent(topic)
    async def process(stream):
        async for event in stream:
            if event.amount >= 300.0:
                yield event
    
  • .map()

    Just call the function you want from within the async for iteration:

    @app.agent(Topic)
    async def process(stream):
        async for key, event in stream.items():
            yield myfun(key, event)
    
  • .forEach()

    In KS forEach is the same as map, but ends the processing chain.

  • .peek()

    In KS peek is the same as map, but documents that the action may have a side effect.

  • .mapValues():

    @app.agent(topic)
    async def process(stream):
        async for event in stream:
            yield myfun(event)
    
  • .print():

    @app.agent(topic)
    async def process(stream):
        async for event in stream:
            print(event)
    
  • .writeAsText():

    @app.agent(topic)
    async def process(stream):
        async for key, event in stream.items():
            with open(path, 'a') as f:
                f.write(repr(key, event))
    
  • .flatMap()

  • .flatMapValues()

    @app.agent(topic)
    async def process(stream):
        async for event in stream:
            # split sentences into words
            for word in event.text.split():
                yield event.derive(text=word)
    
  • .branch()

    This is a special case of filter in KS, in Faust just write code and forward events as appropriate:

    app = faust.App('transfer-demo')
    
    # source topic
    source_topic = app.topic('transfers')
    
    # destination topics
    tiny_transfers = app.topic('tiny_transfers')
    small_transfers = app.topic('small_transfers')
    large_transfers = app.topic('large_transfers')
    
    
    @app.agent(source_topic)
    async def process(stream):
        async for event in stream:
            if event.amount >= 1000.0:
                event.forward(large_transfers)
            elif event.amount >= 100.0:
                event.forward(small_transfers)
            else:
                event.forward(tiny_transfers)
    
  • .through():

    @app.agent(topic)
    async def process(stream):
        async for event in stream.through('other-topic'):
            yield event
    
  • .to():

    app = faust.App('to-demo')
    source_topic = app.topic('source')
    other_topic = app.topic('other')
    
    @app.agent(source_topic)
    async def process(stream):
        async for event in stream:
            event.forward(other_topic)
    
  • .selectKey()

    Just transform the key yourself:

    @app.agent(source_topic)
    async def process(stream):
        async for key, value in stream.items():
            key = format_key(key)
    

    If you want to transform the key for processors to use, then you have to change the current context to have the new key:

    @app.agent(source_topic)
    async def process(stream):
        async for event in stream:
            event.req.key = format_key(event.req.key)
    
  • groupBy()

    @app.agent(source_topic)
    async def process(stream):
        async for event in stream.group_by(Withdrawal.account):
            yield event
    
  • groupByKey()

    ???

  • .transform()

  • .transformValues()

    ???

  • .process()

    Process in KS calls a Processor and is usually used to also call periodic actions (punctuation). In Faust you’d rather create a background task:

    import faust
    
    # Useless example collecting transfer events
    # and summing them up after one second.
    
    class Transfer(faust.Record, serializer='json'):
        amount: float
    
    app = faust.App('transfer-demo')
    transfer_topic = app.topic('transfers', value_type=Transfer)
    
    class TransferBuffer:
    
        def __init__(self):
            self.pending = []
            self.total = 0
    
        def flush(self):
            for amount in self.pending:
                self.total += amount
            self.pending.clear()
            print('TOTAL NOW: %r' % (total,))
    
        def add(self, amount):
            self.pending.append(amount)
    buffer = TransferBuffer()
    
    @app.agent(transfer_topic)
    async def task(transfers):
        async for transfer in transfers:
            buffer.add(transfer.amount)
    
    @app.timer(interval=1.0)
    async def flush_buffer():
        buffer.flush()
    
    if __name__ == '__main__':
        app.main()
    
  • join()

  • outerJoin()

  • leftJoin()

    NOT IMPLEMENTED

    async for event in (s1 & s2).join()
    async for event in (s1 & s2).outer_join()
    async for event in (s1 & s2).left_join()