At Jana, we use Kafka as the backbone for managing all analytics data we collect everyday. It is an amazingly useful tool that facilitates processing the same data in a variety of ways. Abby illustrated how we batch process our counters with Hadoop in her post Using Generic Counters to Measure It. We also have many monitoring applications and decision engines that read events directly off of Kafka streams as they happen, relying on near real-time aggregations of that data.
There are many different ways we want to slice and dice the incoming data depending on the application. Originally we only had one process that would read data from Kafka and update counts in various tables. However, as we grew, this one process was not enough to keep up with the hundreds of millions of events we were ingesting every day.
So, we put on our distributed-computing hats and reworked the consumers to be able to run multiple processes across a cluster by using locks to ensure that only one process was reading from one Kafka broker at a single time. Unfortunately, to update the counts in various tables you must perform an atomic read-then-write operation on the aggregated row, since multiple processes could be writing to the same row at once. As event counts can reach up into the millions, the global lock was too expensive to keep our data up to date with the incoming torrents of data.
There are many StackOverflow questions and blog posts (here, here, here, here) about how to keep accurate event counts in Cassandra. The general consensus is either: a) Don’t use Cassandra (most recommend using Storm*), or b) Separate the writing and aggregation process. We decided to go with plan b for the time being and rewrite how we aggregate event counts in Cassandra. To explain how this works, I’m going to take you through the life of a counter and how it becomes an aggregated count.
Step1: Save the counter for easy aggregation
While working with Cassandra for the past few months, I have learned that you truly do want to store your data in a way that you can query it effectively. When we designed the intermediate table for storing these raw events, we had to think about how we wanted to aggregate the events.
We settled on these two tables for the intermediate storage:
raw_count is fairly straightforward. We chose the partition key to be the precise event and time that the event occurred to distribute the load of writing thousands of events per second across our cluster. This table creates a wide row on id, a timeuuid representing a single action.
raw_count_keys is basically an index on
raw_count so that for a given time period and aggregate table to update, we can perform one query to get all the keys we need to aggregate. Without this index table, we would have no idea which events to query and grab the updated metrics for.
As we are going to be continually adding new events to track, we wanted to keep this table generic enough to be able to accommodate new metrics. The tables are designed by storing the events within a time period. We specify this by the
date_bucket key and
granularity. These columns combine to be able to track any event for any time period that we desire. If we want to track events by days or by seconds, the combination of
date_bucket key and
granularity allows us to be flexible.
Step 2: Aggregate!
From here, we simply pull the data for the appropriate time period that we want to update for a given aggregate table. Because the recently updated events are stored as a wide row, we only need one query to get all the events to update! Finally, for each event return, we can query the
raw_count table to updated the aggregate table. By making this process a regularly scheduled job, we can ensure that data stays up to date in a maintainable way.
Here is what this entire process looks like for keeping aggregated counts of app downloads by day:
With this new method of processing counters, we will hopefully be able to scale up how we process counters in real time.
Did this seem interesting to you? Come talk to us about it, we’re hiring!
*I think eventually we will move to using Storm, as this type of aggregation is right in its wheelhouse. However, due to the overhead of setting it up, having to learn its ins and outs, and adding another cluster to maintain and manage, we decided to punt for the time being and revisit in the near future.