Context
Aampe ingests structured clickstream data on the order of hundreds of million rows on a daily basis. Because we believe in making the lives of our customers as easy as possible, we allow them to share this data with Aampe in the format of their preference (as long as it contains 3 main data points, see docs to learn about the Aampe data model). Some customers chose to share text formats (CSV, JSON, etc.) others prefer binary (parquet, Avro, etc.).
One of our very first tasks is then to normalise this event stream and transform it to the Aampe data model. We will share our methodology for doing that in a follow up post. For now, what’s essential to know is that we maintain data granularity to be able to support all the different use cases needed for the platform to make decisions about which users are going to receive which push notifications.
Problem
The problem we set up to solve was the creation of ML features from this clickstream data at scale. Those features will feed into what we call the User Landscape, and consequently into the baseline user behaviour models that Aampe fits as part of the preference inference pipeline that results in scores capturing the propensity of an individual user to react to the different dimensions of push notification they receive. There’s a lot of fun details we cannot include in this post because of space considerations, stay tuned for a post about that soon.
Naive solution
At Aampe we try hard not to over-engineer so we can avoid premature optimizations that will no longer be relevant a few months down the road. In a fast growing startup that’s focusing on nailing its product-market fit, there’s nothing worse than spending more time than the bare minimum needed to prove value. Several short iterations that adapt quickly to evolving product needs outweigh a long iteration aiming for engineering perfection.
That’s why we took the naive route for the first iteration of generating features. This meant a daily scan of all the clickstream data a customer shared. This was easy to create and even easier to maintain because we could simply update the queries when we needed a new feature. As we grew, and more customers (bigger customers) went live, the costs associated with this process increased linearly with the ever growing size of their data. At one point, we were scanning 150 TBs of data on a daily basis. So it was clear that we needed a redesign that can accommodate all the needs of the platform without having to pay exorbitant amounts in BigQuery on-demand analysis costs (yes, we use BigQuery as our computation backend, we’re excited to share more about that in a future post!).
Second iteration
Beyond the need to reduce the high cost, we also learned a few lessons from the lightweight implementation of the first iteration and the first few customers using Aampe.
- All-time features are not enough
Our data science team wanted to test the hypothesis that our learning models will achieve better baseline performance if we include features aggregating over different time scales (for example: all-time, 90 days and 30 days).
- We can use these features for campaign audience definition in the Aampe Composer
We provide customers with the ability to define constraints on who receives what messages. These could be based on slow changing facts such as the country a user is connecting from or based on recent events performed on the app such as adding a few items to a wishlist.
You can imagine that the number of use cases and feature combinations can be endless.
For example, in a campaign focussed on providing mental health services a customer may only want to look at users in a specific country they are licensed to operate in, while in another, more general, campaign they may want to connect with users that speak a particular language. These kinds of user attributes don’t change as often and don’t need to be computed everyday, but can definitely change from time to time and cannot be static (a user that used to live in Hong Kong, now moved to Singapore).
Some examples based on user events include an e-commerce customer looking to run a campaign for users that have forgotten something in their cart may want to look at data from the past few days while wanting to run another campaign for inactive customers that may not have had any activity in the past few months. A fintech customer looking to run a campaign for users with good payment history may want to look at a data slice that spans a year while at the same time they may want to run another campaign for users that have their payments due in the next few days.
We therefore set to design a system that can accomodate all of that, with a reasonable operational piecetag.
Deep-dive
If you’ve read this far, it means that you’re interested in the details. To make those details easier to digest, we’ll use a toy dataset of a stock transaction log to illustrate the process.
Building blocks
We started by designing two granular tables that would form the basis for the feature generation process:
1. daily_aggregations which holds counts and accumulations of each stock for every day (partitioned by date).
2. snapshot_facts gives the facts about each stock over all time which would be information like lowest price ever, highest price ever, lowest order value ever, highest order value ever for each stocks
The key is that to compute these two tables, we only need to scan only one partition of raw data every day. Using this scan, we can aggregate and insert a partition to the daily_aggregations table and then update snapshot_facts table with any new information where needed (for instance a stock breached its highest ever price then replace the highest_price fact with the new value).
Let's look an example of these tables:
daily_aggregations (Partitioned by date)
snapshot_facts
Rollups
Now that we have the building blocks in place, features can be generated daily by aggregating over the daily_aggregations table for the timescale of choice.
For example, to generate the all time features as before, we could aggregate over the entirety of the daily_aggregations table. But that would still involve scanning large amounts of data everyday. That’s because we will need all partitions of the daily_aggregations table every day. This wouldn’t not be significantly better than the naive and expensive solution we started out with and would still incur high costs.
That’s why we created the all_time_aggregations table that is updated in the form of a rolling window. We start out with a one-time aggregation of all historic information (which does involve scanning all partitions, but that’s okay because it will only be a one time action). Then, for every day going forward we simply update the all_time_aggregations table only based on partitions where data in daily_aggregations would have changed (the latest partition for most customers). This way the all_time_aggregations table is kept up to date without having to scan unnecessary data.
At this point it is worth noting one shortcoming of this approach (at least in its simple form presented here): it only works for features where the aggregation function is linear. So for example, we cannot define a feature that captures the median stock price. Doing that isn’t impossible, but that’s something we don’t need right now so we decided not to engineer for it (trying hard not to over engineer, remember?).
Alright, so far we’ve discussed features based on all-time aggregations. What about supporting aggregations over different timescales that satisfy the needs of our production and data science teams?
All this becomes very easy with the daily_aggregations table that we built. We take the last N days of partitions as a parameter in our daily workflows and aggregate according to that for each workload. So for a workflow needing 7 days of data we load the last 7 partitions from the daily_aggregations table and aggregate up each corresponding column (feature) to get the 7 day aggregates. We then repeat that for any number of days, depending on the use case.
To wrap everything up we’re only left to join the rollups formed from the daily_aggregations with snapshot_facts and feed this into the model as features.
The (maybe) next step
You probably realised that our N-day rollups aren’t as optimised as our all-time rollup. Those use the entirety of the pre-aggregated data every day. We decided that it was okay for now because their computational cost wasn’t high enough to justify more engineering time. But here’s an improvement we could implement, if need arises.
Let's take an example of 7 days partitions. For each new day of data we can retire the oldest day of the rollup and add the latest day of rollup from daily_aggregations
Lets visualise this detail:
Cost analysis
Let’s simulate the cost reduction based on a 40 TB clickstream table size. This table used to be queried 3-4 times every day so this translated to 120-160 TB of scanning everyday ~$15-$20 of BigQuery cost per day. That’s between $450 and $600 a month, per clickstream.
After this revamp, we read about 30GB of daily raw data every day to calculate the building block tables (daily aggregations of event-based attributes and snapshots of slow changing attributes).
We queried these tables multiple times daily for 1 day rollups, 7 day rollups, all time rollups (rolling sum) and the overall query consumption came down to ~80 GB everyday which translated to ~$0.3 every day. That’s less than $10 per month per clickstream.
Summary
- Instead of naive full-table scans, we now store daily clickstream aggregates and generate desired features by aggregating over those
- Those aggregations can easily be set up to any arbitrary number of days, providing for both data science and product needs
- We were able to reduce cost by ~95% from the first native to this second iteration
- This solution can scale to very large amounts of data