Tapstream

Scaling Tapstream

image

In the 3 years of Tapstream’s rise to a leading analytics solution which perhaps receives over 100,000 requests per second sometimes, we’ve been through as many evolutions in backend technology, not to mention constant tweaking and improvements to push our capacity ever higher. Here’s a quick look at Tapstream’s server-side past.

Humble Beginnings

Tapstream began with a simple idea: correlating interactions on the internet at large (“hits”) to app users (“events”). The initial implementation was straightforward: we would store everything we could about the user when they clicked on a special Tapstream link, usually on their way to the App Store. Then, they would install an app containing Tapstream’s SDK, which would phone home to our servers and tell us what it could about this user. With a little doing, we could pick out which hits went with which events and report on anything interesting we found about where those hits came from and what they were doing.

Tapstream 1.0 was a 100% Django affair – both the dashboard, but more importantly the apparatus for storing and correlating hits and events, used regular Django models (“Hit” and “Event”), and relied on Django’s ORM to shove them into Postgres. Here’s some hit model code as dredged up from our repository:

image

This was a simpler time in Tapstream’s life. Want a count of hits broken down by referrer? As simple as a little bit of SQL.

It worked great, for a while. It allowed for very detailed reporting queries for one, and kept the code small and simple. But, as anyone who has ever tried to assemble their own analytics platform will know, this is a heavyweight way to go about the process of storing incoming event data – especially with 6 normalized foreign key fields that have to be managed.

Just the facts, please

The first thing to go was the reporting dashboard. Doing joins with huge tables across that many foreign keys meant that reporting queries became very slow, very quickly, as sales picked up. To address this problem, the next step taken was to flatten out some of those reporting queries into new “fact” tables; instead of storing all the hits as they came in and producing reporting data by counting all those hit rows grouped by day, we just stored a single row containing the day, the user (app), and the count, plus a few more fields powering other pages. When new hits came in, we just updated the count for the relevant row.

Of course, with all those fields, one table wasn’t enough. We also needed a table grouped by referrers, one for page hits, etc., and then each of those needed a new table storing conversions. As we repeated this pattern for other dashboard pages, until we had about a dozen fact tables in total. Now, our dashboard queries were nice and flat and fast again.

Unfortunately, we had a new problem. Instead of storing hit objects directly, each incoming request would update some subset of those dozen tables, locking those rows for the duration of that update. Meanwhile, other requests on the same keys (day, user) would come in, creating a very contentious row-locking situation in postgres. This problem only got worse as we landed bigger and bigger users, causing traffic to grow much faster than expected – a consistent theme throughout Tapstream engineering’s existence.

Also in typical fashion, a new feature requirement emerged: segmenting incoming data based on pre-defined “funnels”. As with the Hit and Event models that preceded the fact tables, funnels would operate on all-time raw data. However, they would also need to work retroactively – funnels defined by the customer tomorrow would have to take into account today’s data, and all without taking down our service. It was already apparent that the Hit and Event model system would not last much longer under the current load, and so work began on a new Hadoop-based system.

When all you have is a Hadoop

The idea was to store all-time data in Hadoop sequence files. The entire dashboard would be based on fact tables only, and the fact tables would be re-populated in their entirety by a Hadoop job running across our data for all time. This meant the funnel feature could be implemented as requested without too much difficulty, and that we could safely collect all-time data without data loss – even if the hadoop job failed, we’d still be collecting all the data to be reported whenever we got the job running again. As a bonus, since we were just storing hits and events as binary blobs written to hdfs from the web nodes, we could collect them at incredible speed.

The Hadoop job was written using Cascading, a pretty great set of APIs that add a lot of nice utilities for working with Hadoop data, the most significant of which is the ability to arrange data by generic structured fields and easily manipulate and/or group by certain fields. Cascading was very easy to work with, and did very well by us. It automatically translates your tasks into Hadoop map and reduce jobs, and collapses sequential map jobs into one for extra efficiency.

On the other hand, perhaps it was too easy; the final topology of our data processing job contained a huge number of reduces, most of them group-by operations that, if we had to take the effort to write the reduce job ourselves, probably could have been simplified. When the Hadoop solution was deployed, it only required 2x the space of the working set to run, but as features were added, it ballooned to 5x the space of the still-rapidly-expanding raw data. Still, even if we had made the whole thing maximally efficient, it would only have delayed the inevitable.

When you have a lot of data and are collecting a lot more, producing analytics across all-time data gets slower and slower – processing an unbounded amount of data requires an unbounded amount of time. Using Hadoop allowed us to throw more nodes at the problem, at significant expense, but there were still a few reduces that would gum up the works pretty good. At first, we ran the job once per hour. Then, every two hours. Slowly, processing time went up and up as more data came in, and eventually we were forced to move back into the world of real-time.

The Third Age: Don’t knock it ‘til you’ve Trident

We weren’t blind to the problems of re-processing a growing data set forever, but, as always, pressure to upgrade came faster than expected. We started moving parts of our infrastructure from Hadoop to Trident within weeks of deploying the former.

First to go was the non-converting data – facts about raw hits and events and their numbers and nature. These tables can be trivially maintained in real-time; just increment the count as the data comes in. The real-time cluster was deployed alongside the all-time Hadoop job, the former counting hits and the latter used for the more complex datasets we used in the dashboard, and everything was happy for a while.

The experience emboldened us to take further steps with Trident. We started thinking about how we could perform conversions between hits and events in real-time again without breaking the bank, and eventually determined that random access would be required. We needed a way of storing data such that we could store a huge amount, but still look up individual pieces quickly. The datastore we landed on was Cassandra.

In retrospect, how the real-time conversions would work is simple: we store the id of each incoming hit and event keyed by its fingerprint data, and search that table for a matching fingerprint when an event or hit comes in. What we weren’t totally sure if is if Cassandra could offer the performance required, but we wrote a Trident topology backed by Cassandra to create conversions on-the-fly anyhow

This was the beginning of the third major architecture backing a mostly-unchanged dashboard. All hits and events would be collected by the web nodes, sent via Kafka to the Trident topology, and stored in Cassandra. Gradually, all of our major features were re-implemented in Trident with Cassandra, until all that remained of the Hadoop architecture was the occasional one-off architecture, and the funnel system.

Advertisers just wanna have funnels

Oh, the funnel system! The single overarching requirement demanding all-time data reprocessing. For a while, we just pointed the funnel Hadoop job at Cassandra, but this proved to have many of the same problems as we had with all-time data on our HDFS sequence files, made worse by the fact that sequential and exhaustive reading of all data in a table in Cassandra is a use case that it’s not really designed to handle. We (I) made a few attempts to write a faster Cassandra-backed funnel-izer, first with Spark (because I read on Hacker News that it was 100x faster than Hadoop!), then with Akka (because actor concurrency is cool), and finally just a plain old Java fork-join job, which turned out the be the fastest and most appropriate of all of these. But, it was still the component of our architecture most imminently in danger, although less imminently than before.

Then, in probably the badass-est feat of software engineering in Tapstream history, Nick built a new funnelizer on an insane library he created: a Hadoop input source reading directly from Cassandra’s data files (SS tables). It worked like a charm, moving the funnelizer off the list of things-about-to-break for a solid year and a half.

The end of eternity

Until a few months ago, we were still storing our all-time raw hit and event data in Cassandra. However, we recently enabled TTLs on our raw hit and event data, expiring those rows out of our working data set after 60 days. No longer must we scan through an eternity of data to pick out the relevant bits we need. Don’t worry though, sales team and customers: we still store conversion session pairs and metadata forever, so you can win this fight yet. You’ll just have to send a whole lot more traffic our way.

Our architecture has been very stable since then – possibly due to our frantic scaling efforts, but probably because of our dedicated dev-ops hire Alain, who among other roles seems to provide improved capacity to recognize and resolve potential scaling issues before they become an impending catastrophe.

Oh, and the dashboard? It’s still backed by more-or-less the same Postgres database we started with, fact tables and all.