Interactive Analytics with RADStack
Interactive Analytics with RADStack
Interactive Analytics
Fangjin Yang, Gian Merlino, Nelson Ray, Xavier Léauté, Himanshu Gupta, Eric Tschetter
{fangjinyang, gianmerlino, ncray86, xavier.leaute, g.himanshu, echeddar}@gmail.com
1
joined, cleaned up, and transformed before it was usable in
Druid, but that was the trade-off we were willing to make
in order to get the performance necessary to power an in-
teractive data application. We introduced stream process-
ing to our stack to provide the processing required before
raw data could be loaded into Druid. Our stream process-
ing jobs range from simple data transformations, such as id
to name lookups, up to complex operations such as multi-
stream joins. Pairing Druid with a stream processor enabled
flexible data processing and querying, but we still had prob-
Figure 1: The components of the RADStack. Kafka lems with event delivery. Our events were delivered from
acts as the event delivery endpoints. Samza and many different locations and sources, and peaked at several
Hadoop process data to load data into Druid. Druid million events per second. We required a high throughput
acts as the endpoint for queries. message bus that could hold these events for consumpation
by our stream processor. To simplify data transmission for
our clients, we wanted the message bus to be the single de-
model for unifying real-time and historical workflows. livery endpoint for events entering our cluster.
The structure of the paper is as follows: Section 2 de- Our stack would be complete here if real-time processing
scribes the problems and use cases that led to the creation were perfect, but the open source stream processing space
of the RADStack. Section 3 describes Druid, the serving is still young. Processing jobs can go down for extended
layer of the stack, and how Druid is built for real-time and periods of time and events may be delivered more than
batch data ingestion, as well as exploratory analytics. Sec- once. These are realities of any production data pipeline.
tion 4 covers the role of Samza and Hadoop for data pro- To overcome these issues, we included Hadoop in our stack
cessing, and Section 5 describes the role of Kafka for event to periodically clean up any data generated by the real-time
delivery. In Section 6, we present our production metrics. pipeline. We stored a copy of the raw events we received in
Section 7 presents our experiences with running the RAD- a distributed file system, and periodically ran batch process-
Stack in production, and in Section 8 we discuss the related ing jobs over this data. The high level architecture of our
solutions. setup is shown in Figure 1. Each component is designed
to do a specific set of things well, and there is isolation in
2. BACKGROUND terms of functionality. Individual components can entirely
fail without impacting the services of the other components.
The RADStack was first developed to address problems
in the online advertising. In online advertising, automated
systems from different organizations will place bids against
one another to display users ads in the milliseconds before a
3. THE SERVING LAYER
webpage loads. These actions generate a tremendous volume Druid is a column-oriented data store designed for ex-
of data. The data shown in Table 1 is an example of such ploratory analytics and is the serving layer in the RAD-
data. Each event is comprised of three components: a times- Stack. A Druid cluster consists of different types of nodes
tamp indicating when the event occurred; a set of dimen- and, similar to the overall design of the RADStack, each
sions indicating various attributes about the event; and a set node type is instrumented to perform a specific set of things
of metrics concerning the event. Organizations frequently well. We believe this design separates concerns and simpli-
serve this insights to this data to ad publishers through vi- fies the complexity of the overall system. To solve complex
sualizations and data applications. These applications must data analysis problems, the different node types come to-
rapidly compute drill-down and aggregates with this data, gether to form a fully working system. The composition of
and answer questions such as “How many clicks occurred and flow of data in a Druid cluster are shown in Figure 2.
over the span of one week for publisher google.com?” or
“How many impressions were seen over the last quarter in 3.1 Segments
San Francisco?”. Queries over any arbitrary number of di- Data tables in Druid (called ”data sources”) are collec-
mensions should return in a few hundred milliseconds. tions of timestamped events and partitioned into a set of
As an additional requirement, user-facing applications of- segments, where each segment is typically 5–10 million rows.
ten face highly concurrent workloads and good applications Segments represent the fundamental storage unit in Druid
need to provide relatively consistent performance to all users. and Druid queries only understand how to scan segments.
Of course, backend infrastructure also needs to be highly Druid always requires a timestamp column as a method
available. Downtime is costly and many businesses cannot of simplifying data distribution policies, data retention poli-
afford to wait if a system is unavailable in the face of soft- cies, and first level query pruning. Druid partitions its data
ware upgrades or network failure. sources into well defined time intervals, typically an hour
To address these requirements of scale, stability, and per- or a day, and may further partition on values from other
formance, we created Druid. Druid was designed from the columns to achieve the desired segment size. The time gran-
ground up to provide arbitrary data exploration, low la- ularity to partition segments is a function of data volume
tency aggregations, and fast data ingestion. Druid was also and time range. A data set with timestamps spread over a
designed to accept fully denormalized data, and moves away year is better partitioned by day, and a data set with times-
from the traditional relational model. Since most raw data tamps spread over a day is better partitioned by hour.
is not denormalized, it must be processed before it can be Segments are uniquely identified by a data source iden-
ingested and queried. Multiple streams of data had to be tifier, the time interval of the data, and a version string
2
Timestamp Publisher Advertiser Gender City Click Price
2011-01-01T01:01:35Z bieberfever.com google.com Male San Francisco 0 0.65
2011-01-01T01:03:63Z bieberfever.com google.com Male Waterloo 0 0.62
2011-01-01T01:04:51Z bieberfever.com google.com Male Calgary 1 0.45
2011-01-01T01:00:00Z ultratrimfast.com google.com Female Taiyuan 0 0.87
2011-01-01T02:00:00Z ultratrimfast.com google.com Female New York 0 0.99
2011-01-01T02:00:00Z ultratrimfast.com google.com Female Vancouver 1 1.53
Table 1: Sample ad data. These events are created when users views or clicks on ads.
Streaming
Data Real-time Druid Nodes
Nodes
External Dependencies
Metadata
Storage
Client
Distributed
Broker Nodes Queries
Coordination
Coordinator
Nodes
Queries
Batch Metadata
Deep Historical Data/Segments
Data
Storage Nodes
Figure 2: An overview of a Druid cluster and the flow of data through the cluster.
that increases whenever a new segment is created. The ver- in the segment. Druid has multiple column types to repre-
sion string indicates the freshness of segment data; segments sent various data formats. Timestamps are stored in long
with later versions have newer views of data (over some columns, dimensions are stored in string columns, and met-
time range) than segments with older versions. This seg- rics are stored in int, float, long or double columns. Depend-
ment metadata is used by the system for concurrency con- ing on the column type, different compression methods may
trol; read operations always access data in a particular time be used. Metric columns are compressed using LZ4[3] com-
range from the segments with the latest version identifiers pression. String columns are dictionary encoded, similar to
for that time range. other data stores such as PowerDrill[8]. Additional indexes
Druid segments are stored in a column orientation. Given may be created for particular columns. For example, Druid
that Druid is best used for aggregating event streams, the will by default create inverted indexes for string columns.
advantages of storing aggregate information as columns rather
than rows are well documented [2]. Column storage allows 3.2 Streaming Data Ingestion
for more efficient CPU usage as only what is needed is ac- Druid real-time nodes encapsulate the functionality to in-
tually loaded and scanned. In a row oriented data store, gest, query, and create segments from event streams. Events
all columns associated with a row must be scanned as part indexed via these nodes are immediately available for query-
of an aggregation. The additional scan time can introduce ing. The nodes are only concerned with events for a rela-
significant performance degradations [2]. tively small time range (e.g. hours) and periodically hand
Druid nodes use one thread to scan one segment at a time, off immutable batches of events they have collected over
and the amount of data that can be scanned in parallel is this small time range to other nodes in the Druid cluster
directly correlated to the number of available cores in the that are specialized in dealing with batches of immutable
cluster. Segments are immutable, and hence, this no con- events. The nodes announce their online state and the data
tention between reads and writes in a segment. they serve using a distributed coordination service (this is
A single query may scan thousands of segments concur- currently Zookeeper[10]).
rently, and many queries may run at the same time. We Real-time nodes employ a log structured merge tree[14]
want to ensure that the entire cluster is not starved out for recently ingested data. Incoming events are first stored
while a single expensive query is executing. Thus, segments in an in-memory buffer. The in-memory buffer is directly
have an upper limit in how much data they can hold, and queryable and Druid behaves as a key/value store for queries
are sized to be scanned in a few milliseconds. By keeping on events that exist in this JVM heap-based store. The in-
segment computation very fast, cores and other resources memory buffer is heavily write optimized, and given that
are constantly being yielded. This ensures segments from Druid is really designed for heavy concurrent reads, events
different queries are always being scanned. do not remain in the in-memory buffer for very long. Real-
Druid segments are very self-contained for the time inter- time nodes persist their in-memory indexes to disk either pe-
val of data that they hold. Column data is stored directly riodically or after some maximum row limit is reached. This
persist process converts data stored in the in-memory buffer
3
Queries For further clarification, consider Figure 4. Figure 4 illus-
trates the operations of a real-time node. The node starts
at 13:37 and, with a 10 minute window period, will only
Heap and in-memory index Off-heap memory and
persisted indexes
accept events for a window between 13:27 and 13:47. When
the first events are ingested, the node announces that it is
event_34982
event_35789 event_23312 event_1234 event_3456
event_36791 event_23481 event_2345 event_4567
event_5678
event_6789
event_7890
event_8901 10 minutes (the persist period is configurable), the node will
flush and persist its in-memory buffer to disk. Near the end
... ...
Persist
event_5678
event_6789
event_7890
event_8901
then announces that it is also serving a segment from 14:00
... ...
to 15:00. At 13:10, which is the end of the hour plus the
window period, the node begins the hand off process.
4
13:47 14:07 ~14:11
persist data for 13:00-14:00 persist data for 13:00-14:00 - unannounce segment
for data 13:00-14:00
13:57
persist data for 13:00-14:00
~14:00
- announce segment
for data 14:00-15:00
Figure 4: The node starts, ingests data, persists, and periodically hands data off. This process repeats
indefinitely. The time periods between different real-time node operations are configurable.
compare the result with what is actually loaded on those Day 1 Day 2 Day 3
across nodes.
Druid historical nodes are very simple in operation. They
Segment_v3
segments. Historical nodes typically store all the data that Segment_v1
ical must first load and begin serving queries for a segment
before that segment can be dropped from the real-time node. Figure 5: Druid utilizes multi-version concurrency
Since segments are immutable, the same copy of a segment control and reads data from segments with the latest
can exist on multiple historical nodes and real-time nodes. version for a given interval. Segments that are that
Most nodes in typical production Druid clusters are histor- completely overshadowed are ignored and eventually
ical nodes. automatically dropped from the cluster.
To consolidate results from historical and real-time nodes,
Druid has a set of broker nodes which act as the client query
endpoint. Broker nodes in part function as query routers to tervals that overlap the lookup interval, along with interval
historical and real-time nodes. Broker nodes understand ranges for which the data in a segment is valid.
the metadata published in distributed coordination service Brokers extract the interval of a query and use it for
(Zookeeper) about what segments are queryable and where lookups into the timeline. The result of the timeline is used
those segments are located. Broker nodes route incoming to remap the original query into a set of specific queries
queries such that the queries hit the right historical or real- for the actual historical and real-time nodes that hold the
time nodes. Broker nodes also merge partial results from pertinent query data. The results from the historical and
historical and real-time nodes before returning a final con- real-time nodes are finally merged by the broker, which re-
solidated result to the caller. turns the final result to the caller.
Broker nodes maintain a segment timeline containing in- The coordinator node also builds a segment timeline for
formation about what segments exist in the cluster and the segments in the cluster. If a segment is completely over-
version of those segments. Druid uses multi-version concun- shadowed by one or more segments, it will be flagged in this
currency control to manage how data is extracted from seg- timeline. When the coordinator notices overshadowed seg-
ments. Segments with higher version identifiers have prece- ments, it tells historical nodes to drop these segments from
dence over segments with lower version identifiers. If two the cluster.
segments exactly overlap for an interval, Druid only consid-
ers the data from the segment with the higher version. This 4. THE PROCESSING LAYER
is illustrated in Figure 5
Although Druid can ingest events that are streamed in
Segments are inserted into the timeline as they are an-
one at a time, data must be denormalized beforehand as
nounced. The timeline sorts the segment based on their
Druid cannot yet support join queries. Furthermore, real
data interval in a data structure similar to an interval tree.
world data must often be transformed before it is usable by
Lookups in the timeline will return all segments with in-
an application.
5
Figure 6: Ad impressions and clicks are recorded
in two separate streams. An event we want to join
is located in two different Kafka partitions on two Figure 7: A shuffle operation ensures events to be
different topics. joined at stored in the same Kafka partition.
6
Data Source Dimensions Metrics
The final job of our processing pipeline is to deliver data a 25 21
to Druid. For high availability, processed events from Samza b 30 26
are transmitted concurrently to two real-time nodes. Both c 71 35
nodes receive the same copy of data, and effectively act as d 60 19
e 29 8
replicas of each other. The Druid broker can query for either f 30 16
copy of the data. When handoff occurs, both real-time nodes g 26 18
race to hand off the segments they’ve created. The segment h 78 14
that is pushed into deep storage first will be the one that is
used for historical querying, and once that segment is loaded Table 2: Characteristics of production data sources.
on the historical nodes, both real-time nodes will drop their
versions of the same segment.
described in Section 4.1. Topics in Kafka map to pipelines
4.2 Batch Processing in Samza, and pipelines in Samza map to data sources in
Our batch processing pipeline is composed of a multi-stage Druid. The second consumer reads messages from Kafka
MapReduce[4] pipeline. The first set of jobs mirrors our and stores them in a distributed file system. This file sys-
stream processing pipeline in that it transforms data and tem is the same as the one used for Druid’s deep storage, and
joins relevant events in preparation for the data to be loaded also acts as a backup for raw events. The purpose of storing
into Druid. The second set of jobs is responsible for directly raw events in deep storage is so that we can run batch pro-
creating immutable Druid segments. The indexing code for cessing jobs over them at any given time. For example, our
both streaming and batch ingestion in Druid is shared be- stream processing layer may choose to not include certain
tween the two modes of ingestion. These segments are then columns when it first processes a new pipeline. If we want
uploaded to deep storage and registered with the metadata to include these columns in the future, we can reprocess the
store. Druid will proceed to load the batch generated seg- raw data to generate new Druid segments.
ments. Kafka is the single point of delivery for events entering our
The batch process typically runs much less frequently than system, and must have the highest availability. We repli-
the real-time process, and may run many hours or even days cate our Kafka producers across multiple datacenters. In
after raw events have been delivered. The wait is necessary the event that Kafka brokers and consumers become unre-
for severely delayed events, and to ensure that the raw data sponsive, as long as our HTTP endpoints are still available,
is indeed complete. we can buffer events on the producer side while recovering
Segments generated by the batch process are versioned by the system. Similarily, if our processing and serving lay-
the start time of the process. Hence, segments created by ers completely fail, we can recover by replaying events from
batch processing will have a version identifier that is greater Kafka.
than segments created by real-time processing. When these
batch created segments are loaded in the cluster, they atom-
ically replace segments created by real-time processing for 6. PERFORMANCE
their processed interval. Hence, soon after batch processing Druid runs in production at several organizations, and to
completes, Druid queries begin reflecting batch-originated briefly demonstrate its performance, we have chosen to share
data rather than real-time-originated data. some real world numbers for one of the larger production
We use the streaming data pipeline described in Section4.1 clusters. We also include results from synthetic workloads
to deliver immediate insights on events as they are occur- on TPC-H data.
ring, and the batch data pipeline described in this section to
provide an accurate copy of the data. The batch process typ-
ically runs much less frequently than the real-time process,
6.1 Query Performance in Production
and may run many hours or even days after raw events have Druid query performance can vary signficantly depending
been delivered. The wait is necessary for severely delayed on the query being issued. For example, sorting the values
events, and to ensure that the raw data is indeed complete. of a high cardinality dimension based on a given metric is
much more expensive than a simple count over a time range.
To showcase the average query latencies in a production
5. THE DELIVERY LAYER Druid cluster, we selected 8 frequently queried data sources,
In our stack, events are delivered over HTTP to Kafka. described in Table 2.
Events are transmitted via POST requests to a receiver that The queries vary from standard aggregates involving differ-
acts as a front for a Kafka producer. Kafka is a distributed ent types of metrics and filters, ordered group bys over one
messaging system with a publish and subscribe model. At or more dimensions with aggregates, and search queries and
a high level, Kafka maintains events or messages in cate- metadata retrieval queries. Queries involving a single col-
gories called topics. A distributed Kafka cluster consists of umn are very frequent, and queries involving all columns are
numerous brokers, which store messages in a replicated com- very rare.
mit log. Kafka consumers subscribe to topics and process
feeds of published messages. • There were approximately 50 total data sources in this
Kafka provides functionality isolation between producers particular cluster and several hundred users issuing queries.
of data and consumers of data. The publish/subscribe model
works well for our use case as multiple consumers can sub- • There was approximately 10.5TB of RAM available in this
scribe to the same topic and process the same set of events. cluster and approximately 10TB of segments loaded. Col-
We have two primary Kafka consumers. The first is a Samza lectively, there are about 50 billion Druid rows in this
job that reads messages from Kafka for stream processing as cluster. Results for every data source is not shown.
7
Mean query latency
datasource
1.0 b
e
0.5
f
h
0.0
1.0
0.5 90%ile
datasource
0.0
a
d
2
15 d4 40 17 94064
d5 41 23 68104
99%ile
d6 31 31 64222
10
5 d7 29 8 30048
0
Feb 03 Feb 10 Feb 17
time
Feb 24 Table 3: Characteristics of production data sources.
Figure 10: Query latencies of production data time interval and 36,246,530 rows/second/core for a select
sources. sum(float) type query.
8
down a view of data until an interesting observation is made. query speed degradations, less than optimally tuned hard-
Users tend to explore short intervals of recent data. In the ware, and various other system bottlenecks.
reporting use case, users query for much longer data inter-
vals, and the volume of queries is generally much less. The
insights that users are looking for are often pre-determined. 8. RELATED WORK
9
11. REFERENCES [15] Paraccel analytic database.
[1] Apache samza. http://samza.apache.org/, 2014. http://www.paraccel.com/resources/Datasheets/
[2] D. J. Abadi, S. R. Madden, and N. Hachem. ParAccel-Core-Analytic-Database.pdf, March
Column-stores vs. row-stores: How different are they 2013.
really? In Proceedings of the 2008 ACM SIGMOD [16] K. Shvachko, H. Kuang, S. Radia, and R. Chansler.
international conference on Management of data, The hadoop distributed file system. In Mass Storage
pages 967–980. ACM, 2008. Systems and Technologies (MSST), 2010 IEEE 26th
[3] Y. Collet. Lz4: Extremely fast compression algorithm. Symposium on, pages 1–10. IEEE, 2010.
code. google. com, 2013. [17] M. Stonebraker, D. Abadi, D. J. DeWitt, S. Madden,
[4] J. Dean and S. Ghemawat. Mapreduce: simplified E. Paulson, A. Pavlo, and A. Rasin. Mapreduce and
parallel dbmss: friends or foes? Communications of
data processing on large clusters. Communications of
the ACM, 53(1):64–71, 2010.
the ACM, 51(1):107–113, 2008.
[18] M. Stonebraker, D. J. Abadi, A. Batkin, X. Chen,
[5] G. DeCandia, D. Hastorun, M. Jampani,
M. Cherniack, M. Ferreira, E. Lau, A. Lin,
G. Kakulapati, A. Lakshman, A. Pilchin,
S. Madden, E. O’Neil, et al. C-store: a
S. Sivasubramanian, P. Vosshall, and W. Vogels.
column-oriented dbms. In Proceedings of the 31st
Dynamo: amazon’s highly available key-value store. In
international conference on Very large data bases,
ACM SIGOPS Operating Systems Review, volume 41,
pages 553–564. VLDB Endowment, 2005.
pages 205–220. ACM, 2007.
[19] M. Stonebraker, J. Becla, D. J. DeWitt, K.-T. Lim,
[6] F. Färber, S. K. Cha, J. Primsch, C. Bornhövd,
D. Maier, O. Ratzesberger, and S. B. Zdonik.
S. Sigg, and W. Lehner. Sap hana database: data
Requirements for science data bases and scidb. In
management for modern business applications. ACM
CIDR, volume 7, pages 173–184, 2009.
Sigmod Record, 40(4):45–51, 2012.
[20] E. Tschetter. Introducing druid: Real-time analytics
[7] B. Fink. Distributed computation on dynamo-style
at a billion rows per second. http://druid.io/blog/
distributed storage: riak pipe. In Proceedings of the
2011/04/30/introducing-druid.html, April 2011.
eleventh ACM SIGPLAN workshop on Erlang
workshop, pages 43–50. ACM, 2012. [21] V. K. Vavilapalli, A. C. Murthy, C. Douglas,
S. Agarwal, M. Konar, R. Evans, T. Graves, J. Lowe,
[8] A. Hall, O. Bachmann, R. Büssow, S. Gănceanu, and
H. Shah, S. Seth, et al. Apache hadoop yarn: Yet
M. Nunkesser. Processing a trillion cells per mouse
another resource negotiator. In Proceedings of the 4th
click. Proceedings of the VLDB Endowment,
annual Symposium on Cloud Computing, page 5.
5(11):1436–1446, 2012.
ACM, 2013.
[9] M. Hausenblas and N. Bijnens. Lambda architecture.
[22] L. VoltDB. Voltdb technical overview.
URL: http://lambda-architecture. net/. Luettu, 6:2015,
https://voltdb.com/, 2010.
2014.
[23] F. Yang, E. Tschetter, X. Léauté, N. Ray, G. Merlino,
[10] P. Hunt, M. Konar, F. P. Junqueira, and B. Reed.
and D. Ganguli. Druid: a real-time analytical data
Zookeeper: Wait-free coordination for internet-scale
store. In Proceedings of the 2014 ACM SIGMOD
systems. In USENIX ATC, volume 10, 2010.
international conference on Management of data,
[11] J. Kreps, N. Narkhede, and J. Rao. Kafka: A pages 157–168. ACM, 2014.
distributed messaging system for log processing. In
[24] M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma,
Proceedings of 6th International Workshop on
M. McCauley, M. J. Franklin, S. Shenker, and
Networking Meets Databases (NetDB), Athens, Greece,
I. Stoica. Resilient distributed datasets: A
2011.
fault-tolerant abstraction for in-memory cluster
[12] N. Marz. Storm: Distributed and fault-tolerant computing. In Proceedings of the 9th USENIX
realtime computation. http://storm-project.net/, conference on Networked Systems Design and
February 2013. Implementation, pages 2–2. USENIX Association,
[13] S. Melnik, A. Gubarev, J. J. Long, G. Romer, 2012.
S. Shivakumar, M. Tolton, and T. Vassilakis. Dremel: [25] M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica.
interactive analysis of web-scale datasets. Proceedings Discretized streams: an efficient and fault-tolerant
of the VLDB Endowment, 3(1-2):330–339, 2010. model for stream processing on large clusters. In
[14] P. O’Neil, E. Cheng, D. Gawlick, and E. O’Neil. The Proceedings of the 4th USENIX conference on Hot
log-structured merge-tree (lsm-tree). Acta Topics in Cloud Computing, pages 10–10. USENIX
Informatica, 33(4):351–385, 1996. Association, 2012.
10