At Jana, we use Cassandra as our run-time data store. We’re really very happy with it—it scales beautifully, and while it does put a bit of a burden on the engineer to think about what happens as the data scales, if you’re at the kind of scale that needs it it’s well worth it.
However, if you’re used to another database that has built-in aggregation functions—perhaps a relational database like MySQL or PostgreSQL, or even some NoSQL databases like CouchDB or MongoDB—then you might be surprised when you find out that no such thing exists in Cassandra. There’s no “sum”, “min”, or “max” in CQL; all of your aggregations need to happen on the client (in our case, in Python).
(An exception is that count() exists; see this post as well for some techniques on counting things in Cassandra, which includes some of the tactics described here. Another thing to note is that Cassandra 2.2 introduces some ways of doing these kinds of computations on the server; while that may make your client code a bit simpler, I expect the ideas here related to data models and partitions will always apply).
While “all of your aggregations need to happen on the client” sounds simple, it’s not helpful to hear if your aggregations are large. Let’s walk through an example.
I want real-time monitoring around some expenses—an up-to-date measure of how much money has been spent today. We store an event in Cassandra for each expense, and there may be ~a million such events per day. They’ll each have a time and a cost in cents.
One of the first things you’ll learn about modeling in Cassandra is model around your queries. Our aggregation is going to need all of the events that happened on a particular day, so we’ll want an entire day on one partition:
CREATE TABLE cost_event ( id timeuuid, day timestamp, cost_cents int, PRIMARY KEY (day, id) );
With this table you can happily
select cost_cents from cost_event where day='2016-02-25'
and add up the results—and this works great if you have a small number of events/day. But as you probably already guessed, it has two major problems:
- Reading: transferring a million rows from Cassandra, then adding them up, is going to be slow.
- Writing: services that record cost events will only be writing to one partition for an entire day.
This is what I mean about it not being helpful to hear that your aggregations have to happen on the client. So how might we go about resolving these scale issues?
Solving the reading problem: tiers of aggregation
To solve the problem of reading and aggregating a million rows, the first thing we might try is to write to a separate aggregate table each time we write a row to the event table:
CREATE TABLE cost_totals ( day timestamp, cost_cents int, PRIMARY KEY day );
Our reads are now quick, but this doesn’t really solve our problem, because in order for this solution to be robust, each update to cost_totals will have to recompute the total from scratch, and for that, you’ll still have to select a million rows.
So instead, we tier the aggregations, in this example by minute, hour, and day:
CREATE TABLE cost_event ( id timeuuid, minute timestamp, cost_cents int, PRIMARY KEY (minute, id) ); CREATE TABLE cost_by_minute ( minute timestamp, hour timestamp, cost_cents int, PRIMARY KEY (hour, minute) ); CREATE TABLE cost_by_hour ( minute timestamp, hour timestamp, cost_cents int, PRIMARY KEY (hour, minute) ); CREATE TABLE cost_by_day ( day timestamp, cost_cents int, PRIMARY KEY (hour, minute) );
Now, each time a new event is processed, we do four writes:
- write the event to the cost_event table
- select all events for this particular minute (which are now on the same partition), add the ~hundreds of them, and write to the cost_by_minute table
- select costs for this hour from cost_by_minute, add the ~60 of them, and write to the cost_by_hour table.
- select costs for this day from cost_by_day, add the ~24 of them, and write to the cost_by_day table.
The whole process gives you a robust sum across the entire day, while only needing to select and sum ~1k rows instead of ~1 million. I chose minutes/hours/days as an illustration, but depending on your application you might want to aggregate by second, or choose granularities like 10s/100s/1000s—the idea is the same. In fact, since the schemas are basically the same for all of these tables, you could use a single table with a “granularity” column for all levels of aggregation, including the granularity as part of the partition key.
You can also reduce the number of reads & writes by processing events in batches—write, say, 1000 events to the cost_event table before running the aggregations for the appropriate set of minutes/hours/days. Or even run the aggregations in a separate thread, using a message queue to keep track of which buckets need updating.
Solving the writing problem: manual striping
The above solution helps the writing problem a little—all events in the same minute are written to the same partition, instead of all events in the same day—but that’s still non-ideal. Consider, for instance, a perfectly appropriate setup where you have 100 threads writing to the cost_event table at the same time, with a single thread doing the aggregations. Those 100 threads would always be writing to the same partition at any given time, and as the number of threads you need increased it wouldn’t be easy to scale your Cassandra cluster to compensate.
The solution here is to artificially spread your writes across partitions by choosing some finite “partition id”, where all possible partition ids are known by your aggregation code. You can do this at each level of aggregation, but for this specific example just doing it at the cost_event level may be sufficient:
CREATE TABLE cost_event ( id timeuuid, minute timestamp, partition_id int, cost_cents int, PRIMARY KEY ((minute, partition_id), id) );
You’ll need to think carefully about how you choose your partition id—in this case to ensure that events with the same id always end up on the same partition you might choose a hash of the id, but if you know that the same event will always be processed by the same thread, you might choose a thread identifier instead, so that each of your 100 threads is writing to a single partition.
Then, your reads need to iterate over each of the known partition ids—which might be, say, 0–9—and do a separate select for each. So you’re putting a bit more burden on your reads to optimize and scale your writing.
I realize this is a lot of prose, but I do think you need a bit of a deeper understanding to really get it right. Hopefully someone finds it helpful! Let me know in the comments!