So, I’m gonna talk today about our metrics, infrastructure and architecture and how we’ve scaled this system to process trillions of points a day.
Visualizing the scale of Datadog’s metrics database
So, I’m gonna start just by sort of putting it to you what does a trillion mean to you? Like, if you try to visualize a trillion of something, what does that look like?
So, you know, maybe you have some ideas.
One way to think about it is 31,000 years is a trillion seconds, so that’s a pretty long time.
A trillion pennies go up to the moon and back about three times.
So, we’re talking about a lot of data and that sort of begs the question: where is all this data coming from?
If you went to our keynote this morning, you heard from our CTO, Alexis, about some of the sources of this data and this explosion that we’re seeing, he referred to it as the Cambrian explosion of data.
And we see as we add these numbers up, as we think about having hundreds of apps, thousands of hosts, tens of containers, you know, a point per second, adding that altogether, we actually come up with a trillion points, even just for a single medium to large customer.
And some of what’s underlying this growth is the decreasing life cycle of our infrastructure.
We’re moving from this world where we had the sort of classical analogy of pets, not cattle.
We moved into the cattle world where we stopped giving our infrastructure cute nicknames, and we thought of it more as a herd.
And now we’re moving into this world where things are living for not just for shorter than weeks or days but in the realm of minutes or seconds or milliseconds in the case of things like Cloud functions.
At the same time, we have this incredible explosion in the granularity of the data that we’re looking at.
Our expectation of the level of detail that we want to get out of our observability systems, our monitoring platforms, and our metrics is really just exploded as we’ve thought more and more about what kind of insight we can gain from this data.
So, we’re seeing this decreasing lifecycle coupled with this increase in the granularity, leading to this world of trillions of metrics per day.
So, these are some performance mantras.
These are things that were come up with by the performance engineering community, credited to Craig Hanson and Pat Crain.
I sort of came across them via Brendan Gregg, an incredibly smart engineer and author currently working at Netflix but came through Sun.
I highly recommend looking into his work.
But I’ll be using these to sort of frame some of the decisions that we’ve made and some of the patterns that we’ve used to scale our system to that volume of trillions of points a day.
So, I’ll talk a little bit about, you know, our architecture sort of at a high level.
I’ll do a bit of a deeper dive on some of our data storage choices.
We’ll talk about how we handle synchronization across all of those data stores, and then we’ll dive into how we leverage approximation to get sort of even deeper insights than we get from our traditional metrics.
And finally, we’ll talk about how that approximation and aggregation combine to enable us to build an even more flexible architecture.
So, diving into the architecture overview, so this is sort of the high-level view.
We have, you know, in the upper left…I have to orient myself where you’re seeing that.
The upper left, where our metrics sources are coming in, we have, you know, things like the agent, our API, our various Cloud and web integrations coming into our intake system.
They’re getting stored in a variety of different data stores, then the query system is pulling the data out of those different data stores, pulling it together, feeding it back to you, and feeding it back to you in a few different ways through APIs, through dashboards, and then through monitors and alerts.
Caching timeseries data
So, one of the first sort of iterations you can sort of add to this model is adding the query caching aspects.
So, this is sort of a…
The first thing that you might think of in this system is well, time-series data is highly cacheable.
It’s data that is very volatile at the most recent end, but as you look back historically, that data isn’t changing as much anymore.
So, it’s very easy to keep that data in a cache.
So, this is sort of a necessary condition for improving the performance of our system, but we find that it’s not sufficient in one part, because, you know, often what you care about is the most recent data, so we can’t cache the data as it’s just coming in.
And we also see that as you’re exploring the data, as you’re debugging different issues, you’re going to be diving into different resolutions and different filters and things like that, and that quickly, sort of busts through the cache and we have to go fetch different segments of that data for the first time.
But mapping this to our performance mantras, we think about this from the perspective of do it but don’t do it again, so leverage caching as much as you can.
Like I said, it’s a necessary but sort of not sufficient approach.
Kafka for independent data storage
So, we’re gonna zoom in a bit on the intake in the data store part of this diagram and see a little bit more detail about what’s happening there in order to answer these kinds of high volume, real-time queries quickly.
So, here we see we’ve zoomed in the intake system, fans out data into a couple of different Kafka queues, and then from there, it goes across many different data stores.
The query system then pulls that all back together and gives you your results, whether it’s through a monitor or a dashboard.
And the important thing to notice here is that there’s this wide variety of sort of heterogeneous data store types that are all reading from a unified topic, in that, you know, we really want this intake part to be as simple and as low latency as possible, so that we can make sure that we’re capturing data with minimal risk of loss.
And that brings us, again, back to performance mantras thinking about doing it later.
We want to maximize that uptime and minimize the upfront processing that we’re doing as we get the data into Kafka where we can work with it much more easily.
Scaling through Kafka
So, how do we leverage Kafka at Datadog?
You know, we make heavy use of the built-in partitioning system in a couple of different ways.
So, we started simply by partitioning by customer.
So, that gave us a really nice isolation where, you know, if a certain customer was scaling up, we could scale up the volume of readers off of that partition without it necessarily spilling into other partitions.
That became sort of not sufficient.
We needed to take another level of partitioning that data, so we went to metric.
That is also actually not sufficient for us in the long-term.
What we’re seeing is individual metrics actually are at a volume that they can’t be handled, or it’s not reasonable to sort of scale an individual partition to handle some of the largest metrics in our system.
And that’s where we’re actually in the process right now of building something even more dynamic to balance the workload across our systems and make it much more evenly distributed and avoid hotspotting by metric.
So, doing it concurrently is a thing that we’re getting out of our use of Kafka.
We’re able to use independently horizontally scalable data stores, all reading off of the same Kafka topics, whether they’re points or tags that data, and able to really customize those data stores to the specific use case and the specific function that each of them are trying to perform.
So, we’re gonna dive now into exactly what those functions are and how we sort of map those functions on to storage and database technologies.
Quantity is a (challenge of) quality in its own right
Coming back to this model of trillions of points per day, we want to think about how do we translate this into a volume of data? And how do we use that to how we think about the storage layer and the databases that we’re using.
So, if we think about this, what we’re storing essentially is an 8-byte float for each of these things and then we want to add a little bit of overhead for storing the tags that kind of gets amortized over all of the data, and we end up with an estimate of something like 10 terabytes a day to store these trillions of points a day.
That brings us to what are sort of the technology options that we have for storing this data?
These are some really common options.
We’re using AWS as an example, as a reference point as it’s widely used.
People are highly familiar with it.
And you can sort of think about the different options based on these different tiers that AWS provides, but this sort of goes across different Cloud providers or even data center kinds of architectures.
These are listed in, you see, in ascending order of latency and descending order of price.
These things are inversely related.
So, what if we wanted to store all of this data in RAM, for instance?
This is sort of like the upper bound in terms of how we could answer these queries.
We could do it really, really quickly if we just put everything in RAM and said, “This will be great.
It’ll be super-fast." There’s nothing faster, I mean, you know, with some margin of reasonableness.
In order to do that, we’d need to use a instance type like the AWS x1e.32XL, we’d need to use 80 of them.
That comes out to something like $300,000 a month, which is a lot.
But it’d be really fast.
And then this is also not accounting for indexes, overhead.
This is not accounting for redundancy.
You know, if one of those 80 nodes goes down, maybe you lose one-eightieth of your data.
That’s not acceptable.
And, of course, we have even more data than a single month.
This is all just modeling a single month’s worth of data.
So, this brings us to the idea that we need to take a hybrid approach.
We need to leverage different kinds of storage in order to meet our performance in our efficiency goals.
So, in order to think about this, we want to think about what kinds of queries we need to answer in order to tailor our data storage to the use case and get the most efficiency out of it.
What queries did we need to support?
So, these are the four sort of common queries that we need to answer from our system, our data stores.
So, the first one being DESCRIBE TAGS.
This is essentially what is driving the drop downs in your Datadog UI.
If you’re selecting a filter or a group, a list of metrics, etc., that’s the first thing that you’re doing to sort of craft your query.
We need to have that data available relatively quickly so that you can sort of explore your data quickly.
We have the TAG INDEX, and we have a TAG INVERTED INDEX.
So, you can think of this as two sides of the same coin.
We have the TAG INDEX, which is mapping an ID to a unique set of tags in our system.
And on the flip side, we have an INVERTED INDEX.
If you’re familiar with that term, you might sort of have an intuition about what that means, but to spell it out, it’s really taking all of the different individual tags and groups and saying, “This tag maps to this time, this list of time series IDs.”
So, if I say, “I want to do a filter on, you know, availability zone, U.S. East 1A, give me all of the time series IDs that match that specific filter.”
And then the last one here is our core POINT STORE.
This is the mapping of a time series ID to the point, to a 8-byte float.
And that’s, you know, an incredibly high volume of data, and we’ve really distilled it to sort of its simplest possible form.
Only index what you need
So, it’s really important for us to think about only indexing exactly what we need in order to be able to answer these queries efficiently.
It’s really easy to over-index data.
You want to start trading your write volume to get better read performance.
That’s what indexing is for, but then you start…
You know, if you’re over-indexing the data, you’re gonna end up…
You know, a float is a really small value.
If you have too many indexes for the same float value, it’s gonna quickly dominate the storage volume of your system.
Cloud storage options we considered
So coming back to this table, we’re gonna look at these three choices as the ones that we’ve sort of focused on in terms of their price and performance effectiveness.
Obviously, RAM, super-fast, super useful, really great for things like caches, really recent data, things like that.
We found that local SSDs really provide a much better price-performance point for us than things like EBS.
So, you see, like, EBS is an order of magnitude more expensive than local SSDs, it has an order of magnitude higher latency than local SSDs.
But, of course, EBS comes with a ton of amazing tooling and features around snapshotting.
So, we’ve had to rebuild a lot of that in order to leverage the performance of local SSDs in AWS, but it’s really been worth the investment for us.
That’s sort of a trade-off that you have to make for yourself, but we’ve found a lot of value in being able to do that.
And then, of course, we make use of S3 because it is incredibly durable and just offers really cheap, effective storage.
You know, we know that data will be there effectively forever.
How our hybrid data storage system works
So, we’re gonna map these storage technologies back to the kinds of queries and the data stores that we need to answer.
And what you’ll see is that what we’ve done is we’ve split up the kinds of queries that we need to do based on the time ranges that we expect to need to cover them and map to those time ranges on two different storage choices.
So, you see the Persistence on the right, and the type of storage that we’re choosing on the left where, you know, when we’re looking at RAM, we’re looking at the last few hours, we’re looking at SSDs.
We’re looking at SSDs on the order of days, and we’re looking at S3 on the order of months and years.
And then we can add another layer on that, and we can look at the technology that we’re putting on top of our storage on these storage platforms.
On the higher levels of persistence and common things like red is for caching, we’re leveraging open source technologies, and then you see the things that are right in the critical path, in the highest volume data, in really the most time-sensitive kinds of queries.
We’re looking at storing things in RAM, and we’re using actually in-house built data stores.
This is sort of the core of the platform in that, you know, we’ve really found that we need to own that part in order to be able to answer the volume of queries that we need to at the volume of data efficiently.
So, what we’re taking away from this is…
You know, in order to do this efficiently, to do it cheaper, we’re really mixing the open-source and in-house.
We’re mixing the different kinds of storage technologies to put it all together in a way that meets the performance requirements at each sort of level of persistence.
So, of course, thinking about using all of these different data stores and spreading this data across systems this way, it leads you to the question of how do you handle synchronization in a world that’s so widely distributed?
And the really critical point to think in this area is actually in the area of monitors and alerts.
So, in a human latency sense, the eventual consistency that we get out of our data stores and Kafka tends to, you know, meet human latency requirements.
Monitor your environment and alert on anomalies with confidence
It doesn’t always meet the requirements of our monitoring system.
And in particular, it’s important for us to ensure that we’re not going to trigger false positive alerts.
It’s really critical that we know that we have all of the data that we need in order to have confidence that when we send you all a page, that we have everything that we need to know 100% that that is all the data and that there’s not data lagging in our pipeline somewhere, etc.
So, it’s really thinking about what that evaluation period is and how do we track all of the data through the system to know that we have it all to be able to monitor it and tell you something is definitely wrong.
And the way that we do that is using what you can think of as a heartbeat pattern.
This is a common pattern in systems.
You might be familiar with it from tracking replication lag in your database if you’ve used Percona Toolkit and Percona Heartbeat, things like that.
And what we do is we do this as far outside of our system as possible, even going so far as to generate this data from a different data center, trying to mimic the experience of your agents or your services calling our APIs, etc., in order to track the full round-trip latency through the system.
And then we look for that data that we injected synthetically on the other side to know, you know, that the data has made it all the way through, and that the last point has made it through each of the different systems that I’ve talked about and that it’s all come together at the end before we say, “Okay, now we can go ahead and monitor this alert.”
And this happens every single second we’re looking to see, “Okay, do we have everything up to this second? Do we have everything up to this second?”
So, really what we’re focused on is, you know, we’re building synchronization in the system, but we’re building the minimal amount of synchronization that we need.
We want to do as little synchronization as possible and avoid that because as soon as we get deeply into this area, you know, you start to get into really hairy coordination problems, and we want to avoid that as much as possible.
So, now let’s talk a little bit about how we actually have leveraged approximation in our architecture in order to get even deeper insights, moving a little bit beyond some of the traditional metrics that you might be thinking of and into sort of a new kind of metric, and how that’s actually enabled us to scale to even more data.
Types of metrics
So, think about these are sort of your traditional types of metrics in StatsD and whatnot.
Everything sort of boils down to counters and gauges, and we talk about them in terms of the aggregation because that’s sort of what defines them.
They’re either counters are aggregated by sum over time or gauges which are aggregated by the last value you’ve seen.
These are continuous functions.
They’re aggregated by last seen or average over time.
And some examples of counters: requests, errors, total time spent.
If you’re thinking about it like a stopwatch where you’re just continually accruing time, gauges, continuous functions, these are things like system metrics or queue length where there’s always a value, and you’re just taking readings from that value.
Aggregating counters and gauges
So, how we aggregate this, you know, you actually derive the meaning from the data based on how you aggregate it.
You get very different kinds of meanings.
These are four different time series over 10 time intervals.
If you think about this data as counters, you get one set of outputs.
If you think about it as a gauge and you average that data over time, you get another set of outputs, and if you aggregate it by the last value seen, you get a third set of outputs.
So, you’re actually giving meaning to the data by how you choose to think about the data and the aggregation that you apply to it based on that type.
So how does that apply to approximation?
Distributions: one step beyond aggregations
Well, we’re thinking about this in terms of aggregation and what if we want even more sophisticated kinds of aggregation?
And we want to think about being able to answer questions, not just about the average latency or the max latency, but we actually want to aggregate by percentile.
We want to know what the 95th percentile latency or the 99th percentile request size is.
We actually need to take a different approach to how we aggregate our data.
And going back to this example, again, we’re gonna take another pass at this same data set and derive a completely different set of meaning out of it.
And what we are looking at here is we’re looking at either the median or the 90th percentile value across both time and space of this data, and in order to do that, we have to take every single data point in this set and we have to sort it.
And then we have to examine the rank value out of the set for the median.
The median rank here is between the 20th and 21st value, so that’s a 5, averaging two 5s is a 5.
If you look at the 90th percentile value, that’s the 36th ranked value, in that case, that’s a 32.
So, you know, this is a latency dataset, you know, our 90th percentile latency is 32 seconds or something.
So, you can imagine that aggregating this data can be really expensive.
You’re looking at every single data point, and you’re sorting it, and then you’re looking at a particular rank.
And a single new value in that series can actually dramatically change what that ranked value ends up looking like.
So, how do we do this sort of efficiently and apply sort of our…
Can we do this more cheaply than storing every single point, particularly in a world where are our maximum resolution is one second?
Managing tradeoffs with Sketches
So, thinking about the traditional engineering triangle, fast, good or cheap, we’re gonna think about what are some of the valid values of the data that we’re ingesting and what are some of the queries that we’re gonna be asking of our data in order to be able to balance these three factors effectively and come up with a good answer?
And that brings us to a data structure called Sketches.
This is a data structure that comes to us from the stream processing literature and it has a couple of properties that are important to…
And these properties are related, you can sort of imagine, but essentially what you’re thinking about is you want to examine each point in the stream that you’re examining once.
You don’t want to go back and revisit that point every time you’re doing a query.
You actually want to only examine that point once, and you want to have a limited amount of memory usage, in sort of the upper bound case, maybe a logarithmic to the size of the data stream.
But you might actually even be able to do better than that, and you might actually be able to have absolute constant max upper bound.
And we’ll talk a little bit about how we’re able to achieve that.
So, you might be familiar with some sketches.
The most famous one is called a HyperLogLog.
You might use this today if you use the PF star commands in Redis.
This is just an implementation of HyperLogLog packaged up in Redis.
This is a cardinality estimator, so it tells you how many unique values are in a set.
So, if you wanted to know the unique number of visitors on your site, it would give you some answer, you know, plus or minus 2% or something like that.
There’s some other ones, Bloom filters, frequency sketches, etc.
But we’re going to think about this in terms of that distribution use case that I talked about at the beginning, the percentile latency, percentile request size, SLOs, etc.
Applying the classic engineering triangle to distribution metrics
So, what do we mean by fast, good and cheap in approximating distribution metrics?
We need them to be accurate, of course.
We want them to be fast.
We need them to both be quick on insertion.
So, that means that we’re minimizing the impact on our customers' systems, where they’re running our agent.
And then we also need them to be quick at query time, of course, because we talked about how we’ve done all of this work to make queries really fast.
And we want them to be cheap.
We want them to have a fixed size and memory, again, answering that sort of efficiency challenge and making sure that we’re not doing anything more than necessary.
Approximating a distribution
So, how do we approximate a distribution?
A really common pattern is this one of a bucketed histogram is sort of the technical term that’s used.
But essentially you can think of a distribution as this curve, and we’re splitting the curve into discrete parts, and we’re measuring the height at each point, and then we’re storing the height and the count as a bucket.
So, OpenMetrics in Prometheus to do this.
This is an example from their documentation.
You can see here this is a histogram of request duration and we have some buckets and there is, on the right, some counts.
I’m gonna sort of clean this up and make this a little bit easier to see.
Hopefully, my laptop is not blocking things too much, but what we have here on the left is the time spent, and what we’re saying is that at each level, there is a Count of the number of requests that were less than the limit here.
So 50 milliseconds, 100, 200, 500 milliseconds, a second, and then greater than a second.
So if we want to know what our median latency is, given this kind of data structure, we need to look at the rank. And the rank here is out of 144,000 values, we want the middle one.
So, we want the 72nd thousandth value, and that falls somewhere between 100 milliseconds and 200 milliseconds.
And what do we do then?
We use a technique called linear interpolation.
We draw a line between those two buckets and then we see where the 72nd value would fall on that line, assuming that all of the values in between those buckets are evenly distributed.
And that gives us 158 milliseconds.
What we don’t actually know is what the real distribution of the values inside that bucket is.
Are they close to 200 milliseconds?
Are they close to 100 milliseconds?
Maybe you don’t care.
Maybe knowing it’s between 100 milliseconds and 200 milliseconds is good enough.
I would point out also in this example if I wanted to know the 99th percentile latency, I actually don’t have a good way of doing that because it falls into the last bucket.
The last bucket is defined as anywhere between one second and infinity, and there actually isn’t a way to linearly interpolate that value.
So, I sort of have to throw up my hands and say, “It was more than a second.”
Was it 10 seconds or 100,000 seconds?
I don’t really know.
Rank, relative errors, and the GK sketch
So, there’s another approach to answering this kind of query through a sketch data structure called the GK sketch that is named for Greenwald and Khanna.
So they are authors of a paper.
And this sketch is designed around a concept called rank error, and what rank error means is they can give you a bounded error on their response, or for a query of this data.
And what they’re gonna say is you can choose what granularity you want, but if I say, “I want it to be plus or minus 1% in rank,” when I query for the 99th percentile, I will be guaranteed to get a value between the 98th percentile and the 100 percentile, or the max.
And that’s fine for certain distributions, but it’s actually a challenge for tailed distributions, and what we’ve seen is that the vast majority of datasets that people want to look at distributions across are really long-tail distributions.
Your request latency…
The alternative is there’s a big spike over on the left-hand side, and that’s usually if you have a timeout, and so there’s a big spike at the end that’s like, “Oh, we hit 60 seconds.”
So, everything falls in that bucket.
But if you don’t have a timeout like that, you know, your latency may just keep going all the way out there, and what you’ll see is the GK sketch will give you values that sort of bounce around between the 98 percentile and the max, and that doesn’t really give you a sense of where that real 99 percentile value is.
What we found that we want, and what our customers are interested in, is something with a relative error guarantee.
And this gives us a guarantee that the value that we return to you is within one percent of the actual value of the 99th percentile latency.
What we’ve seen is that this meets our customers' use cases much better and provides you sort of a nice, clean distribution reporting line and a metric that you can alert on much more effectively.
So what does good mean? Relative error
So, coming back to our engineering triangle, thinking about what good means, good to us means this relative error where we’re able to say, 99% of requests are guaranteed to be less than 505 milliseconds if you gave us an SLO of something like…you know, or are 99% of my requests less than 500 milliseconds.
We can say, “Well, we’re not sure exactly if they’re less than 500 milliseconds.”
But, we can say with 100% certainty that there are less than 505 milliseconds, and that’s pretty close.
What does cheap mean? Fixed storage size
This is a really expensive data structure, so we have a max bound.
And the way that we’re able to achieve that, we use 4,000 buckets, which is a lot more than the sixth that the OpenMetrics example used, and OpenMetrics could be extended.
You could use as many buckets as you want, but you have to define them in advance.
There’s a bunch of limitations around that.
And we create these dynamically, so we only use exactly as many buckets as we need.
If we ever ran into a case where we needed to use all 4,000 buckets, and we needed a 4001st bucket, what we would do is we would roll up those lower buckets because what we found again is that in the kinds of queries that our customers are asking, they’re asking for that tail latency, that high-end, 99th, four-ninths of latency.
They’re not as interested in, let’s say, 10th percentile latency.
That’s generally not something that people are looking very closely at.
So we can start to roll those buckets up.
Now, it’s worth noting that with 4,000 buckets, we’ve never actually needed to do this, again, just because of the shape of most customer’s data.
But in theory, you know, we have that max bound.
Fast: insertion and query operations
And then this is really fast.
So each insertion is two operations.
You find the bucket, you increase the count.
It’s really a simple operation to count these things this way.
And then queries just need to look at the range of buckets.
And again, it could be up to 4,000.
It’s a constant factor, and it tends to be many, many less.
So doing it cheaper, we’re leveraging approximation to, you know, answer these complex queries in a very efficient way.
So, the last part we’ll talk about here is, how do we actually use this data structure to get a even more flexible architecture?
And that brings us to this concept of commutativity.
So if something is commutative, we have this sort of the Wikipedia definition up here, but you think about, multiplication is commutative.
So, you say, two times four equals four times two, right?
And this is an important characteristic for aggregation.
If you’re averaging a series of values, say I have one value that is the average of some number of values, and I have a new value, I don’t have a way to combine that average and a new value into a new average because I don’t know what the weight of the number of values in that average is.
So I don’t know if I should count the new value as one-tenth of the total amount or one-one-hundredth, etc.
So what we’re looking for actually in this data structure as well as all of the other properties is this concept of commutativity where I can have a new sketch, a smaller sketch, and a large sketch somewhere else in my pipeline and I can put those together, and I can still get the same answer with the same kind of error guarantees that I’m looking for.
Distributing work throughout the pipeline
And this allows us to distribute work throughout our entire pipeline.
So, we can save work at different points and continue to merge things partially throughout the process.
So, we can actually merge things in the agent.
We can merge things in a new component, our streaming aggregator.
We can merge things in the data store itself.
We can even cache partially merged data in the query cache as part of the query system.
We can really distribute the work throughout the pipeline and do it, you know at the last possible moment or do it as far in advance as possible depending on what the most efficient choice is.
So, this allows us to sort of fill in our last performance mantra, so doing it while the user is not looking.
We have that streaming aggregator that I mentioned that’s collecting the data and doing a bunch of back-end processing so that these queries are super fast.
So, we actually have our own implementation of this sketch data structure that, you know, builds on a lot of past literature, but it does have some novel aspects that I talked about.
We will actually be talking about it at VLDB in August, which our data science team will be talking about it and we’re very excited about that.
And we will be releasing open-source standalone versions in several languages.
It already exists in Go today as part of the Datadog agent, which is, of course, Apache-licensed.
So, really what I want you to take away from this talk is doing exactly the amount of work needed at exactly the right time and nothing more really gets you a lot in terms of meeting your performance goals and building a highly scalable distributed data system.