My name is Vadim. I work as an engineer at Datadog.
Let me…So I’ll be talking about a service that we had been working on for a year and a half that replaced a different service.
So a little bit about Datadog itself. We are a monitoring company, we provide monitoring for different things from hardware servers to databases, web servers…we collect application performance monitoring, metrics, logs, etc. And obviously, we alert.
But no worries, the talk is not gonna be about Datadog at all. This is a little bit of content that I’m gonna go through.
And first of all, for those who arrived to New York, welcome to New York. It’s been waiting for you.
Back to serious problems. This is a description of the data that we have.
How Datadog Used To Capture Historical Metrics
We capture org IDs, metric IDs, some timestamps, values, and this goes through a certain map that distributes the payloads to top Kafka topics and partitions. So a certain metric will always go to a certain topic and partition.
And from there, different topics and partitions are consumed by a host or a single consumer, and then this consumer writes data to different files, one file per metric ID. And after that, it encodes and compresses and writes custom binary files to S3 every X hours.
This whole system powers historical metrics at Datadog, so once you go on dashboards older than one day, the system is gonna use files produced by that system.
The problem with that original system is that one host has hard limit on the number of file descriptors, which is one million.
So once we hit one million file descriptors, we have to split topics and partitions across new hosts and new consumers, and for that we had to set timestamps when new must start consuming, old stop consuming, and so on. And this manual process is really prone to mistakes.
The other problem is that once you have a host having high CPU usage, you split it and you have couple of hosts that have 50% utilization, and you’re basically paying extra money for some resources that you are not gonna use until you ramp up your topics and partitions.
And the other problem is once you go to one partition per host and you hit one million file descriptors, you basically have no room to upscale. And if you’re no dice, you have to start a new instance, reset Kafka offsets, replay data for the past X hours, and that’s a huge manual process.
The other part that is difficult is that it’s difficult to predict like which org and metric is gonna be big, so this system creates some hot, big topics and partitions which creates problems downstream.
Requirements for a new system
So we decided to create a new system that…a new service that automatically balances orgs and metrics across different topics and partitions, so each Kafka topic partition is equally sized.
But that means that we now have to consume all the data and we don’t know in which exact partition our metric is stored.
So out of this, we got the requirements to the new system.
Conceptually, it must work with the new partitioning schema. It must be able to handle 10X growth if we grow like twice…if we double our size of data every year, it must work for the next three years.
We got…we have to keep the cost at the same level as the existing system which was a little bit tricky.
And it must be as fast as the existing system which is also tricky.
But besides conceptual problems or conceptual requirements, we had operational requirements.
So the system must be easily scalable without much manual intervention. We should minimize impact on Kafka. We should reduce data retention time. It should still be able to replay data easily in case we have some bugs.
So at Datadog, before starting a new service, overall we always create a document outlining everything that we are gonna implement and then we request comments.
Decoupling state and compute
And as you can see, this document was started in June, a year and a half ago, pretty much, and we got some feedback and we discussed before we even started working on anything.
Here is a quote from Taylor Swift, sorry. We need to load all topics/partitions to compose a single timeseries. Why not offload Kafka to somewhere and then load the whole dataset with Spark?
This is a really insightful quote and we did exactly that.
So going back to the previous model that we had.
So as you can see, file descriptors, encoding and compressing and writing.
In this part, we have state and we have compute separate.
So what we want to do is we want to store Kafka to some storage (which will be state), and then we want to load storage and do all the computations (which is compute).
And the technologies that we decided to use are Kafka-Connect and Spark.
So Kafka-Connect would just get data from Kafka and store it to S3 and then S3 would load it as if we were reading Kafka.
And out of this was born the name of the project, Tailor-S. So it tailors secondary resolution data in our files. So that’s why we call it Tailor-S.
State: Kafka Connect
I’m not gonna talk a lot about Kafka-Connect but basically, it’s a simple consumer that writes data to S3 as is, without any modifications, every 10 minutes. And the goal is to deliver data to S3 as fast as possible.
On the graph, you can see that we write lots of files every 10 minutes.
Kafka-Connect is really easy to operate. We just need to update single config if you want to add new topics or remove some topics. If we change the number of partitions, it will handle everything automatically. Removing and adding workers is super easy.
Kafka-Connect rebalances itself.
And because we write files every 10 minutes, we commit offsets every 10 minutes, it means that if we want to stop the system, it will only push us back 10 minutes so we can reduce Kafka retention.
The other part that was tricky about Kafka-Connect is we had to fine-tune memory and overall.
So here, you can see the process itself uses about 27 gigabytes and the heap stays about 16, so about 10 gigs are used for other off-heap operations such as network I/O, compression, decompression, and other parts, which you normally don’t see if you just look at the heap.
We also make sure that our nodes loaded. We don’t have idle workers but overall, balancing workers is still an unsolved problem.
We write—every 10 minutes—a lot of data. I actually was told to remove the y-axis here but it’s in terabytes basically.
So that’s a lot of data, and because of that we had to do a lot of optimizations. So we had to randomize key prefixes to avoid having hot underlying S3 partitions.
We know that AWS recently announced, like a year ago, that they solved this problem and we actually thought that they solved it, but turns out they did not and we had to implement randomization.
We implemented parallelized multipart uploads because our CPUs were mostly idle when we were just using single-threaded uploads and we submitted a pull request to Confluent but it was not accepted.
We figured out the optimal size of buffers to avoid out of memory issues that we had before…we discovered them by monitoring the system.
And we still have lots of slowdown errors from S3 so we have exponential backoff for that and we monitor retries.
And here’s like Taylor Swift points. This is like…shows like the number of points we process.
The other part…after we solved the problem…the part with the storage and saving the state, we had to load all this…the whole Kafka basically for eight hours or even more in Spark.
And there were lots of unknowns.
Reading 10 trillion points is very difficult. You’re gonna have lots of objects so you need to somehow minimize garbage collection.
You need to figure out how to utilize internal APIs of Spark to squeeze all the juices out of Spark. Is it even possible with Spark?
I remember one of the first conversations with my team lead. I was like, “I don’t know…maybe it’s not possible.”
Make it cost efficient. That’s another part. You can read like 10 trillion points but at what cost, right?
So minimizing garbage collection. We had to reuse lots of objects.
This is one of the examples that we’ve done. So we allocate one megabyte ByteBuffer once we open a file. We keep decoding payloads, we compress them using ZSTD into the allocated memory, so in one megabyte block. And then we get data from this same ByteBuffer so we only allocate the ByteBuffer once and keep reusing it.
The other part is we use the DataFrame API extensively because Spark allows you to bypass completely creating objects, and Spark knows how to handle its own internal memory representations.
But for that to be as close as possible to the DataFrames, we wanted to create a DataFrame-compatible file format.
So we created it. It provides a reader of InternalRows and then this InternalRow points to the regions of memory in the allocated buffer from previous slide.
So this is an example of what it actually does. So this whole thing is the code that Spark generated so it…for some stages, it generates code (Java code) and executes it.
So here you get InternalRow and then from this InternalRow it gets integers from specific positions. And inside our reader, we have just a single row, the same object. And then when somebody calls
get integer, we know field position and then we just slide within the ByteBuffer and get everything.
So here we avoided creating lots of objects and we directly deliver primitives to the Spark internal memory.
We did some profiling which didn’t help us much.
To be honest, I don’t actually remember what I was profiling on these screenshots but I just threw them like, “We did it.”
Here’s an…after we created the file format and we replaced the previous methods with datasets, as you can see, the task time overall, 700 hours went down to 400. And the most important part is garbage collection dropped by 3 times.
That’s not…the other problem we had is that Kafka-Connect sometimes creates really big files, bigger than two gigabytes, and in Spark…in Java overall, you cannot have byte arrays bigger than two gigabytes.
And overall, like if you try to read like 30 files of 2 gigabytes in memory, you’re gonna use like 60 gigabytes which is not great.
So instead, what we do is we copy a file locally and then we do some…we map it into the virtual memory using a library from Indeed.
And then we allocate empty buffer, ByteBuffer, using Java reflections and then we point ByteBuffer to that region of memory inside the memory map file. And then we give this ByteBuffer to ZSTD to decompress and everything thinks that it’s just ByteBuffer but this actually points to the memory map file.
So this is an example of some unsafe memory manipulations that we had to do in order to completely bypass creation of unnecessary objects and reading like two gigabyte files.
The other part is that some files get really big. So when you have like multiple tasks running, some of them will be completely skewed.
So instead of that, we wanted our file format to be splitable as well, so we set split size of one gigabyte. So multiple readers are gonna read like different parts of the file.
And inside the file we write length, payload, length, payload, length, payload. So when a reader starts reading it, it has like start position and end position and it keeps skipping payloads until it reaches the startByte.
And because of lots of tricks with allocation/deallocation of memory, we had to implement fine-grained monitoring around that. So we tracked how much memory we allocated, how much memory we deallocated.
And as you can see here, our code is pretty efficient so we only use like four gigabytes max for each executor.
The other part that I was talking about is using Spark internal APIs.
So you have
DataSet.map(). It must create objects. It copies primitives from Spark memory. It has schema, which is great. It is type-safe which is also great. But it is slow because of the first two parts.
What instead we do is we use
queryExecution.toRdd() which returns internal row, but it doesn’t create objects. It’s just a single internal row, the same object. So it’s pretty much the same trick we did in the beginning but now we’re using it like from read inside.
It doesn’t copy primitives. It has no schema which is not great at all, and it’s not type-safe so you can shoot yourself in the leg.
And internal row has direct access to Spark memory.
So here is how we do it.
So here we have our DataFrame.
We do some operations and then once we have created this DataFrame, we do
queryExecution.toRdd(), we get
InternalRow, and then we have to get fields using field indexes.
So we had to do also lots of testing around that.
We did some settings to offHeap. So here, as you can see, at max we use about like 180 gigabytes but the offHeap itself is about like 60. So 120 gigabytes is used for lots of offHeap operations.
And we did testing.
So here we only compare garbage collection to task time so these screenshots were taken at a different time.
But basically, the first one…you can see almost half of the execution time is spent on garbage collection. And once we enabled offHeap, under like maybe a fourth, fifth.
So everything is great. And this is a real production job, so here we load like 22 terabytes and task time is like 1,000 and garbage collection only like 70 or 60 hours, so only 6%.
And in the real cluster, we run multiple jobs and the garbage collection is even lower: below 1%.
So all those optimizations led to removing garbage collection basically.
I’m sorry. Coffee break. Water break. Thanks.
Testing. Unit tests are boring.
We have them. There’s lots of references to Taylor Swift as well.
Integration tests, we have them. Also like 1989.
Staging environment, we have it so I’m not gonna talk about it.
We did load-testing, slowest parts, checking data correctness, and game days, so I’m gonna be talking about those parts.
So load testing. Once we had a working prototype, we started throwing more data at our job. So we threw like 10X more data and we checked like what parts are breaking, what are the slowest parts, etc., etc.
And we were estimating cost. So as our company grows, like how expensive it’s…is it gonna be leaner, exponential or something else?
Slowest parts. We got to have…you always have to have a good understanding of slowest parts of your job so you would know when it’s gonna break, when you have to start optimizing it and so on. So knowing limits is really important.
And we report lots of metrics about parts that we think are slow or fast.
And we also have historical data so we compare like how stages become slower or faster. This report also prints ASCII portrait of Taylor Swift every time we run it.
So after we ran the new system using all of the data that we have, we did a one-to-one join with the previous system. So you have like 10 trillion points on one side and 10 trillion points on the other side, which was another problem to join.
But this allowed us to find some edge cases that we were able to eliminate.
So out of trillions of points, you see less than 1% had a difference and we were like, “Why is that?”
And it turns out when we were reading, we had to recreate the same positions of points. So you have a point with the same timestamp, but within the Kafka topic, they were scrambled.
After eliminating this issue, we were able to completely eliminate any differences, pretty much.
Game days. Game days are, for those that are not familiar, created to test your system, how resilient it is, what are the limits and fault tolerance, and so on.
So you come up with scenarios. So you kill a node, you kill the whole service, you make a node like slow or like different…like different, different scenarios. You list expected results.
You then run scenarios and you write down what happened. And then you summarize key lessons.
So for example, here we were killing the old nodes.
We expected some things to happen. Some things happened, some things didn’t.
Here, we were slowing down a node artificially.
And out of this, it all allowed us to figure out like, do we need monitoring of certain parts? Do we need to invest in new tooling? And so on.
And also allowed us…allowed other people to get familiar with the system overall.
Once we confirmed that our prototype works using the whole volume of data, we decided to split the job into shards.
So there were multiple reasons.
So we use spot instances, so losing a single job for a shard will not result into losing the whole progress.
And if, for some reason, there is an edge case, it will only affect the single shard so your job will…other jobs will pass but this shard will be affected so you can limit the scope of your problem.
And ability to process shards on completely separate shards…sorry, clusters.
So sometimes in your availability zone you will not have available instances, so you can start in a separate availability zone and process data from S3 because it’s multi-AZ replicated.
So before implementing sharding, we had to identify independent blocks of data. So in our case it was simply orgs. So because one org doesn’t depend on other org, we were able to split them completely.
And on the Kafka-Connect side we just have a config file that spreads them across 64 shared shards, or if an org is really big, we can move it to a separate shard.
And because we know a single job can process all the data that we have, and now we have a single shard that’s 64 times smaller, it means that our job overall, if we continue doubling data, it will work for the next like six years, after which we can increase the number of shards.
After we did all the testing and all the preparations, we had to start migrations.
Migrations are really difficult. So in order to replace the existing system, we had to do multiple things.
First of all, we wanted to run both systems alongside. We needed a release plan and a rollback plan.
We needed to make sure that systems that depend on our data can work with both new and old. And we wanted to also do partial migrations of customers, so if the new system is not producing the correct data, only a subset of customers will be affected.
We needed to check everything: underlying pipelines and everything else.
And finally, we had to do a final migration.
So to run both systems alongside, we wanted to run them as close as possible to production, same volume of data.
We wanted to output to a completely separate location so nobody uses this data—but the whole system runs as if in production.
We made sure that there were no discrepancies with existing data and we treated every incident as a real production incident so there was an on-call rotation and everything and we were writing postmortems as usual.
But nobody was using our data, we were just getting familiar with the system.
This approach allowed us to find bottlenecks that we previously didn’t see or know about. We figured out what kind of monitoring we were missing, and we got people familiar with this whole system and how to operate it without affecting production.
And yeah, we figured out some additional tooling that we needed.
Release plans and rollback plans. For pretty much every piece of the whole system and underlying system, we had really detailed plans on how to deploy and how to roll back.
As you can see, on deploy here, we checked all the boxes and we didn’t need our rollback plan, luckily.
For dependent systems, we wanted to switch customers to the new system and back, in case of problems. We wanted to, for dependent pipelines, to be able still to load both datasets and see like nothing happened.
Yeah, that’s what I said.
Oh, and we wanted to make sure that if you have dependent pipelines running using new data, it will have the same output as if you were running it using old data.
Partial migration of customers. It’s very expensive to run both systems alongside when you have a system that burns like millions of dollars and you want to run it at least six months, you’re gonna burn a lot of money.
So we wanted to just migrate some customers. So we wanted to migrate our org.
So at Datadog, we use Datadog to monitor Datadog. We are one of the biggest customers of Datadog so it was natural for us to like move us and see like, “Oh, what’s up?”
So we wanted to do it for a month and then after a month, we wanted to move certain customers.
But the problem was lots of parts were not ready for that, so like lots of parts didn’t know that there will be partial migrations.
Usually, when you do a migration, you do it like a swap, right?
No. We had to like come up with a system that is gonna offer like a certain timestamp it will stop writing in the old system and then will start writing in the new system and so on.
It was difficult to implement and maintain migration timestamps for each org. Certain things didn’t have versioning so we had to add it. And for downstream pipelines everything must look like nothing happened.
So you’re not gonna touch every pipeline, you somehow have to create some mechanism that’s gonna work for that.
And we had to do lots of integration tests with migration timestamps. So the integration tests that we had helped us.
And overall, for all the pipelines that we have, we have integration tests so it was pretty easy to check if migration timestamps are gonna work.
And after that, it was pretty easy. We picked a date, added additional integration tests, tested on staging, rolled in production, we let the old system run for a week, we killed the old system, and we cleaned up old code.
Actually, we haven’t cleaned up old code but that’s just future.
So here’s a comparison of the old system, so it was like everything storage, compute was everything inside the system.
You can see Kafka-Connect compute cost is 13%, S3 is 39% so basically storing Kafka. Spark is 77%. And if you summarize those numbers, you actually get number 129 so it’s more expensive which is understandable.
Once you offload a state and load it, you will have to spend more compute resources, right. You have to deserialize, serialize, send it to work, etc., etc.
But because we were able to remove a lot of data from Kafka, we…in total…way cheaper than the old system. So we actually saved lots of money.
And the speed. So here, you can see that we prepared data within 10 hours so we would write files within 10 hours after like point appeared.
And here we had some outage. Life happens.
So results in high level.
The new system works with a new partitioning schema. It handles 10X growth.
We keep the costs at the same level as the existing system and actually cheaper. And it’s as fast as the existing system.
Operational. It scales without much manual intervention.
So Kafka-Connect, we just add additional workers.
On Spark side, we just add additional executors and S3 just elastic cloud, right.
We minimize impact on Kafka. We reduce data retention in Kafka a lot and we actually store Kafka data in S3 longer, two times longer. So we actually increase retention, in a sense.
We’re still able to replay data easily. We actually had to replay Kafka-Connect and Spark jobs many times, and we actually had game days where we were replaying some Kafka-Connect data so we were sure that everything works.
And here is a graph of PagerDuty incidents in hours. So the green one is the old system, the blue one is Kafka-Connect, and the purple one is a Spark job. And we completely switched from the old system in October, and as you can see, the old system had lots and lots of troubles.
So overall, in terms of ops, we also won.
What we learned
And in conclusion, what I want to reiterate is that documents and plans and RFCs are really helpful.
We document a lot of things, we discuss lots of things without doing stuff, spending time.
So that’s definitely something that everybody should be doing.
We did lots of testing which helped us a lot. We did load tests, we did like integration tests, everything.
Migrations are difficult. Unfortunately, I don’t have like the silver bullet for that. Migrations will always be difficult, I think.
There were many engineering obstacles that we were able to solve, albeit some of like the methods that we used were really unconventional.
And we were constantly monitoring…we were constantly checking our forecasts about the cost and speed of the new system because if you build an expensive new system, you’re probably not going in the right direction.
So we actually were doing forecasts, estimates every month while we were working, for like a year and a half.
This is pretty much it.
This is my work email, personal, Twitter, and Venmo if somebody wants to send me… Thank you for listening and…