Tutorial: Count page views

In the Quick Start tutorial, we went over a simple example reading through a stream of greetings and printing them to the console. In this playbook we do something more meaningful with an incoming stream, we’ll maintain real-time counts of page views from a stream of page views.

Application

As we did in the Quick Start tutorial, we first define our application. Let’s create the module page_views.py:

import faust

app = faust.App(
    'page_views',
    broker='kafka://localhost:9092',
    topic_partitions=4,
)

The topic_partitions setting defines the maximum number of workers we can distribute the workload to (also sometimes referred as the “sharding factor”). In this example, we set this to 4, but in a production app, we ideally use a higher number.

Page View

Let’s now define a model that each page view event from the stream deserializes into. The record is used for JSON dictionaries and describes fields much like the new dataclasses in Python 3.7:

Create a model for our page view event:

class PageView(faust.Record):
    id: str
    user: str

Type annotations are used not only for defining static types, but also to define how fields are deserialized, and lets you specify models that contains other models, and so on. See the Models, Serialization, and Codecs guide for more information.

Input Stream

Next we define the source topic to read the “page view” events from, and we specify that every value in this topic is of the PageView type.

page_view_topic = app.topic('page_views', value_type=PageView)

Counts Table

Then we define a Table. This is like a Python dictionary, but is distributed across the cluster, partitioned by the dictionary key.

page_views = app.Table('page_views', default=int)

Count Page Views

Now that we have defined our input stream, as well as a table to maintain counts, we define an agent reading each page view event coming into the stream, always incrementing the count for that page in the table.

Create the agent:

@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        page_views[view.id] += 1

Note

Here we use group_by to repartition the input stream by the page id. This is so that we maintain counts on each instance sharded by the page id. This way in the case of failure, when we move the processing of some partition to another node, the counts for that partition (hence, those page ids) also move together.

Now that we written our project, let’s try running it to see the counts update in the changelog topic for the table.

Starting Kafka

Before starting a worker, you need to start Zookeeper and Kafka.

First start Zookeeper:

$ $KAFKA_HOME/bin/zookeeper-server-start $KAFKA_HOME/etc/kafka/zookeeper.properties

Then start Kafka:

$ $KAFKA_HOME/bin/kafka-server-start $KAFKA_HOME/etc/kafka/server.properties

Starting the Faust worker

Start the worker, similar to what we did in the Quick Start tutorial:

$ faust -A page_views worker -l info

Seeing it in action

Now let’s produce some fake page views to see things in action. Send this data to the page_views topic:

$ faust -A page_views send page_views '{"id": "foo", "user": "bar"}'

Look at the changelog topic to see the counts. To look at the changelog topic we will use the Kafka console consumer.

$ $KAFKA_HOME/bin/kafka-console-consumer --topic page_views-page_views-changelog --bootstrap-server localhost:9092 --property print.key=True --from-beginning

Note

By default the changelog topic for a given Table has the format <app_id>-<table_name>-changelog