How we solved RevenueCat’s biggest challenges on data ingestion into Snowflake
Challenges, solutions, and insights from optimizing our data ingestion pipeline.
At RevenueCat, we replicate our production data to Snowflake (a cloud-based data warehouse) for data exports, reports, charts and other needs.
Our infrastructure is pretty standard:
- Primary production DB: Aurora Postgres.
- Debezium to capture the change events from the WAL.
- A schema registry (Apicurio) keeps track of schema changes, versioning, etc.
- Kafka to process the events (Avro) and write them into S3. We chose parquet files (in S3) to achieve good compression and performance.
- A system to get the changes in S3 into Snowflake and consolidate the create/update/delete events into DML. We don’t keep the original change stream in Snowflake since we have that in S3, our data lake, and Snowflake the lakehouse.
- We use Fivetran for third-party API-based sources, and data is pushed directly to the consolidated layer. We skip the data lake layer because it’s convenient.
- Once the data is consolidated, we use dbt to transform and enrich data between layers.
- Airflow mainly orchestrates dbt runs, as we don’t process data in Airflow.
- Data is queried on Snowflake (using proper clustering columns configuration) or consumed as data exports.
The diagram (simplified) looks more or less like this:
In this blog, I’ll take you through the intricacies of our data management practices, specifically focusing on the journey of our data from its origins to its final destination in Snowflake. We’ll explore the challenges we faced, the solutions we devised, and the insights we gained through the process of optimizing our data ingestion pipeline.
Ingestion process: from S3 to Snowflake
We used some commercial solutions to replicate the Aurora Postgresql WAL to Snowflake for a while, but we ran into many problems. TOAST fields were not properly handled, schema evolution was flaky, our volume was too large, inconsistencies were hard to diagnose, and the general opaqueness of the system was a big problem for us.
We decided to experiment with replacing it with a more standard Debezium + Kafka + S3 to capture the changes and find our own way to load from S3 to Snowflake. Why didn’t we choose the Snowflake Connector for Kafka? We wanted to build a data lake with agnostic parquet files that can be used for other purposes or tools such as Spark.
After testing multiple alternatives, we finally developed a quite novel model leveraging Snowflake external tables and streams.
Snowflake has the concept of the external table, just a virtual table pointing to S3 files. Snowflake runs the computing, but the storage is provided by AWS (or any other cloud provider). It’s a “read-only table” (not even a table; it’s just a way of querying parquet), so you can’t perform DDL operations, as Snowflake only holds file metadata and gives you access using SQL as an interface.
You can create a Snowflake stream on an external table, which is how Snowflake names its CDC feature. The fancy thing here is that Snowflake supports “insert-only streams on external tables,” which basically maintains the state of all the new rows coming from any new file uploaded to S3. This was huge for us, as we saved many engineering cycles on building an ingestion system that keeps track of all the files ingested (or not) in our lake house.
As the incoming rows from files in S3 already have the operation column from Debezium (insert/update/delete), a Snowflake task just picks the batch and applies the merge to the consolidated table accordingly.
As a result, our ingestion pipeline only uses Snowflake to collect, track, and consolidate changes coming from S3 into our consolidated layer.
With hundreds of tables, configuring the external tables, streams, and merge queries is quite complex and error-prone, so we created a tool to configure and manage all these. This tool proved critical in allowing us to evolve and experiment with alternative pipelines in an agile and flexible way.
The historical data in S3, viewable as external tables, provides a great tool to inspect the previous row versions, for debugging or troubleshooting data consistency issues. It is like having git history for your full database, or in classic database terms, an easy to query copy of the entire WAL since then we rolled out the tool 🙂
This is a simplified version of the ingestion pipeline:
The problem of continuous consolidation
One unique aspect of RevenueCat’s data is that many tables are heavily updated; we don’t have much immutable, event-like data.
When we started tracking ingestion execution times, we realized that multi-billion row tables often took multiple hours to consolidate (merge) in Snowflake, even using a large warehouse. This was not acceptable in terms of cost and latency.
Most OLAP engines typically struggle with updates. To understand why, you can simply look at some of the most common data lake file formats, such as parquet. The data is split into file chunks and heavily compressed. To change just one row, you must read the whole row set, modify the single row, write it back in full, and recompress it. Other file formats are row-based, like Avro, but mostly meant for optimizing partial/selective reads; modifying a row still requires reading and writing back the whole file. If you have a huge table with many chunks and want to perform updates that are scattered across the table, you will end up causing most of the chunks to be rewritten. Inserts, on the other hand, just mean creating a single new chunk file with many rows. Deletes could be costly if actually performed, but OLAP typically implements them as “inserted” tombstone marks or look-aside tables.
While we don’t know what storage mechanism Snowflake uses internally, empirical evidence shows that it follows the same pattern. Inserts and deletes (with low concurrency) are fast, but the cost of scattered updates grows linearly with the table size.
There weren’t easy solutions:
- Changing how we use our data and moving to a more immutable, event-based model was not a viable short-term solution we could adopt.
- Changing all derived pipelines to use the replication stream data without merging will speed up ingestion but slow all queries on the massive unmerged data.
Fortunately, we came up with one idea, using a hybrid solution, that could fix both problems, and we set on to test if it would work for us.
Consolidate once a day while having fresh data
Let’s say that we have a table named “users.” This table contains ~15B rows, and we process ~35M changes daily (inserts, updates, and deletes). Half of them are updates, which makes ~70k updates per hour. You can try to consolidate changes hourly, but in our tests, it can take up to 2 hours to finish, making the pipeline fall behind.
The ingestion warehouse was running for the whole day, representing more than ⅓ of the total cost in Snowflake. While using an even larger cluster helped, we clearly had a problem. The resources it took to consolidate the table were a factor of its size. The time to update 70k, 200k, or 2M rows was virtually the same and mostly related to the full table size.
The only solution was to consolidate less often, but how could that be done while maintaining a low replication lag?
We started researching, implementing, and testing what we named a low-latency view. The idea is simple: if the number of changes is orders of magnitude lower than the actual table size (0.1% for the example table), we can create a view that yields an on-the-fly consolidated version of the data without actually consolidating the last small batch of changes in the storage.
Conceptually, we split the table into two subsets:
- The consolidated table, with one single physical row for each logical row, contains just the last version of the row (as far as the consolidation has seen)
- The ingestion table, with the set of incoming changes, as create, insert, and delete events, might contain several occurrences of the row for each logical row.
The view abstracts these sources, allowing users of the view to see the last version for each row. Most often, the logical row comes from the consolidated table. But if it has been modified recently, it will come from the last version seen in the ingestion table.
In practice, it is just a UNION, but as always, the devil is in the details, and this UNION of these two subsets is more complex than it seems. For example, TOAST fields have to be specially cased, and it is hard to ensure that the execution plan will be optimal. It doesn’t matter if the small ingestion table is full-scanned for all queries. However, the consolidated tables have billions of rows, and the query predicates need to be properly pushed down to this part of the query execution to minimize the partitions scanned and the number of bytes sent over the network during the Snowflake compute.
The new ingestion process is as follows:
- Changes coming from S3 are tracked using an external table and a stream (same as before).
- A Snowflake task runs every X minutes, consumes the Snowflake stream, and inserts data into the ingestion table. This is what we call the “continuous ingestion process.” This ingestion is blazing fast; it inserts a small number of rows into an equally small table, which takes seconds in an extra-small cluster.
- The consolidation process runs once a day (can be tuned depending on how fast the ingestion table grows). It merges the changes from the ingestion table into the consolidated table and then empties the ingestion table, ensuring that it does not balloon in size. This consolidation rewrites the underlying table, requiring a large cluster (like before), but now it runs only once daily.
- As an additional benefit, auto-reclustering can be enabled in the consolidated tables, improving query efficiency. Before, this wasn’t practical, as updates constantly modified the consolidated tables, and the clustering process was always running, never finishing, and substantially increasing the bill.
During the process, the low-latency view always shows an up-to-date data version. Visually, both processes look like this:
So, now we have the view and a mechanism that orchestrates the continuous ingestion and consolidation processes.
The next challenge was coordinating this work for the more than 100 tables being ingested in the lakehouse. Luckily, as mentioned already, we spent engineering time building good tooling to configure and manage the different pieces of the ingestion pipeline. Thanks to this tool, generating all the views, tasks, and SQL code related to the continuous and consolidation processes takes a few minutes. It allowed us to test, iterate, and refine at great speed. In contrast, a manual approach would have taken us days, if not weeks, to assemble.
Conclusion and takeaways
The team is happy with the results as we achieved the goals we aimed for:
- Reduce the ingestion latency: We went from 2-3 hours to (potentially) a few minutes. In reality, we are consuming data from the stream every 30 minutes. We could run a task every minute to get the freshest data, but it does not make sense as our extract, transform, and load (ETL) processes only run every 1-4 hours.
- Reduce cost: We reduced the cost by approximately 75%. Previously, our large warehouse was operational nearly all day. Now, we’ve streamlined it to activate just once daily for the consolidation process. This image speaks for itself:
Key takeaways from this work include:
- Consolidating change events (inserts, deletes, updates) into tables can turn very slow and costly if you have a lot of updates. insert/delete events are easier to consolidate. Hybrid approaches like those described here can help ingest update-heavy change streams with low latency and cost.
- Invest in tooling that helps you move faster in the long run. We would not have been able to complete this project if we hadn’t invested early in the tooling and took the easy path to configure things manually or in a declarative (but manual) format.
- Whenever possible, try to design your data to be immutable, denormalized, and event-like, as it simplifies data operations, replication, storage, and consolidation
- Debezium and Kafka are a solid tandem for replication needs; they outperformed commercial solutions in cost, reliability, and flexibility.
- Snowflake external tables and streams can simplify data ingestion from S3, hiding many complexities by providing a reliable, high-level stream of changes.
- Having a data lake of the replication stream provides massive value for investigations, both product and data replication issues. We chose to keep this in S3 but (for convenience) queryable as Snowflake external tables. Remember to compact and partition the files into larger ones so that the data compresses better and is accessed faster with fewer S3 requests. Doing so allows indefinite retention and lowers the bill when accessing historical data. We used Spark to compact the small files created by Kafka into ~250MB parquet files (as per Snowflake doc recommendation).
- Dealing with Postgres TOASTed values from Debezium introduced an unexpected amount of complexity. If you are using Postgresql and logical replication, this post is worth a read
You might also like
- Blog post
Implementing in-app purchases and monetizing your Roku app: A step-by-step guide
A comprehensive guide to implementing and monetizing your Roku channel, from choosing the right strategy to technical tips and integration best practices.
- Blog post
How we built the RevenueCat SDK for Kotlin Multiplatform
Explore the architecture and key decisions behind building the RevenueCat Kotlin Multiplatform SDK, designed to streamline in-app purchases across platforms.
- Blog post
Inside RevenueCat’s engineering strategy: Scaling beyond 32,000+ apps
The strategies and principles that guide our global team to build reliable, developer-loved software