Groking The System Design Interview
Groking The System Design Interview
3. Summary
Solving system design questions could be broken down into three steps:
• Scoping the problem: Don’t make assumptions; Ask clarifying questions to understand the
constraints and use cases.
• Sketching up an abstract design Illustrating the building blocks of the system and the
relationships between them.
• Identifying and addressing the bottlenecks by using the fundamental principles of
scalable system design.
4. Conclusion
Design interviews are formidable, open-ended problems that cannot be solved in the allotted time.
Therefore, you should try to understand what your interviewer intends to focus on and spend
sufficient time on it. Be well aware of the fact that the discussion on system design problem could
go in different directions depending on the preferences of the interviewer. The interviewers might
be unwilling to see how you create a high-level architecture covering all aspects of the system or
they could be interested in looking for specific areas and diving deep into them. This means that
you must deal with the situation strategically as there are chances of even the good candidates
failing the interview, not because they don’t have the knowledge, but because they lack the ability
to focus on the right things while discussing the problem.
If you have no idea how to solve these kinds of problems, you can familiarize yourself with the
common patterns of system design by reading diversely from the blogs, watching videos of tech
talks from conferences. It is also advisable to arrange discussions and even mock interviews with
experienced engineers at big tech companies.
Remember there is no ONE right answer to the question because any system can be built in
different ways. The only thing that is going to be looked into is your ability to rationalize ideas
and inputs.
System Design Basics
Whenever we are designing a large system, we need to consider few things:
1. What are different architectural pieces that can be used?
2. How do these pieces work with each other?
3. How can we best utilize these pieces, what are the right trade-offs?
Investing in scaling before it is needed is generally not a smart business proposition; however, some
forethought into the design can save valuable time and resources in the future. In the following
chapters, we will focus on some of the core building blocks of scalable systems. Familiarizing with
these concepts would greatly benefit in understanding distributed system design problems discussed
later. In the next section, we will go through Consistent Hashing, CAP Theorem, Load Balancing,
Caching, Data Partitioning, Indexes, Proxies, Queues, Replication, and choosing between SQL vs.
NoSQL.
Let’s start with Consistent Hashing.
Consistent Hashing
Distributed Hash Table (DHT) is one of the fundamental component used in distributed scalable
systems. Hash Tables need key, value and a hash function, where hash function maps the key to a
location where the value is stored.
index = hash_function(key)
Suppose we are designing a distributed caching system. Given ‘n’ cache servers, an intuitive hash
function would be ‘key % n’. It is simple and commonly used. But it has two major drawbacks:
1. It is NOT horizontally scalable. Whenever a new cache host is added to the system, all
existing mappings are broken. It will be a pain point in maintenance if the caching system
contains lots of data. Practically it becomes difficult to schedule a downtime to update all
caching mappings.
2. It may NOT be load balanced, especially for non-uniformly distributed data. In practice, it
can be easily assumed that the data will not be distributed uniformly. For the caching
system, it translates into some caches becoming hot and saturated while the others idle and
almost empty.
In such situations, consistent hashing is a good way to improve the caching system.
How it works?
As a typical hash function, consistent hashing maps a key to an integer. Suppose the output of the
hash function is in the range of [0, 256). Imagine that the integers in the range are placed on a ring
such that the values are wrapped around.
Here’s how consistent hashing works:
1. Given a list of cache servers, hash them to integers in the range.
2. To map a key to a server,
• Hash it to a single integer.
• Move clockwise on the ring until finding the first cache it encounters.
• That cache is the one that contains the key. See animation below as an example: key1
maps to cache A; key2 maps to cache C.
To add a new server, say D, keys that were originally residing at C will be split. Some of them will
be shifted to D, while other keys will not be touched.
To remove a cache or if a cache failed, say A, all keys that were originally mapping to A will fall
into B, and only those keys need to be moved to B, other keys will not be affected.
For load balancing, as we discussed in the beginning, the real data is essentially randomly
distributed and thus may not be uniform. It may make the keys on caches unbalanced.
To handle this issue, we add “virtual replicas” for caches. Instead of mapping each cache to a single
point on the ring, we map it to multiple points on the ring, i.e. replicas. This way, each cache is
associated with multiple portions of the ring.
If the hash function is “mixes well,” as the number of replicas increases, the keys will be more
balanced.
CAP Theorem
CAP theorem states that it is impossible for a distributed software system to simultaneously provide
more than two out of three of the following guarantees (CAP): Consistency, Availability and
Partition tolerance. When we design a distributed system, trading off among CAP is almost the first
thing we want to consider. CAP theorem says while designing a distributed system we can pick
only two of:
Consistency: All nodes see the same data at the same time. Consistency is achieved by updating
several nodes before allowing further reads.
Availability: Every request gets a response on success/failure. Availability is achieved by
replicating the data across different servers.
Partition tolerance: System continues to work despite message loss or partial failure. A system
that is partition-tolerant can sustain any amount of network failure that doesn’t result in a failure of
the entire network. Data is sufficiently replicated across combinations of nodes and networks to
keep the system up through intermittent outages.
We cannot build a general data store that is continually available, sequentially consistent and
tolerant to any partition failures. We can only build a system that has any two of these three
properties. Because, to be consistent, all nodes should see the same set of updates in the same order.
But if the network suffers a partition, updates in one partition might not make it to the other
partitions before a client reads from the out-of-date partition after having read from the up-to-date
one. The only thing that can be done to cope with this possibility is to stop serving requests from
the out-of-date partition, but then the service is no longer 100% available.
Load Balancing
Load balancer (LB) is another critical piece of any distributed system. It helps to distribute load
across multiple resources according to some metric (random, round-robin, random with weighting
for memory or CPU utilization, etc.). LB also keeps track of the status of all the resources while
distributing requests. If a server is not available to take new requests or is not responding or has
elevated error rate, LB will stop sending traffic to such a server.
To utilize full scalability and redundancy, we can try to balance the load at each layer of the system.
We can add LBs at three places:
1. Smart Clients
A smart client will take a pool of service hosts and balances load across them. It also detects hosts
that are not responding to avoid sending requests their way. Smart clients also have to detect
recovered hosts, deal with adding new hosts, etc.
Adding load-balancing functionality into the database (cache, service, etc.) client is usually an
attractive solution for the developer. It looks easy to implement and manage especially when the
system is not large. But as the system grows, LBs need to be evolved into standalone servers.
2. Distributed cache
In a distributed cache, each of its nodes own part of the cached data. Typically, the cache is divided
up using a consistent hashing function, such that if a request node is looking for a certain piece of
data, it can quickly know where to look within the distributed cache to determine if that data is
available. In this case, each node has a small piece of the cache, and will then send a request to
another node for the data before going to the origin. Therefore, one of the advantages of a
distributed cache is the increased cache space that can be had just by adding nodes to the request
pool.
A disadvantage of distributed caching is remedying a missing node. Some distributed caches get
around this by storing multiple copies of the data on different nodes; however, you can imagine
how this logic can get complicated quickly, especially when you add or remove nodes from the
request layer. Although even if a node disappears and part of the cache is lost, the requests will just
pull from the origin—so it isn’t necessarily catastrophic!
3. Global Cache
A global cache is just as it sounds: all the nodes use the same single cache space. This involves
adding a server, or file store of some sort, faster than your original store and accessible by all the
request layer nodes. Each of the request nodes queries the cache in the same way it would a local
one. This kind of caching scheme can get a bit complicated because it is very easy to overwhelm a
single cache as the number of clients and requests increase, but is very effective in some
architectures (particularly ones with specialized hardware that make this global cache very fast, or
that have a fixed dataset that needs to be cached).
There are two common forms of global caches depicted in the following diagram. First, when a
cached response is not found in the cache, the cache itself becomes responsible for retrieving the
missing piece of data from the underlying store. Second, it is the responsibility of request nodes to
retrieve any data that is not found in the cache.
Most applications leveraging global caches tend to use the first type, where the cache itself manages
eviction and fetching data to prevent a flood of requests for the same data from the clients.
However, there are some cases where the second implementation makes more sense. For example,
if the cache is being used for very large files, a low cache hit percentage would cause the cache
buffer to become overwhelmed with cache misses; in this situation, it helps to have a large
percentage of the total data set (or hot data set) in the cache. Another example is an architecture
where the files stored in the cache are static and shouldn’t be evicted. (This could be because of
application requirements around that data latency—certain pieces of data might need to be very fast
for large data sets—where the application logic understands the eviction strategy or hot spots better
than the cache.)
Cache Invalidation
While caching is fantastic, it does require some maintenance for keeping cache coherent with the
source of truth (e.g., database). If the data is modified in the database, it should be invalidated in the
cache, if not, this can cause inconsistent application behavior.
Solving this problem is known as cache invalidation, there are three main schemes that are used:
Write-through cache: Under this scheme data is written into the cache and the corresponding
database at the same time. The cached data allows for fast retrieval, and since the same data gets
written in the permanent storage, we will have complete data consistency between cache and
storage. Also, this scheme ensures that nothing will get lost in case of a crash, power failure, or
other system disruptions.
Although write through minimizes the risk of data loss, since every write operation must be done
twice before returning success to the client, this scheme has the disadvantage of higher latency for
write operations.
Write-around cache: This technique is similar to write through cache, but data is written directly
to permanent storage, bypassing the cache. This can reduce the cache being flooded with write
operations that will not subsequently be re-read, but has the disadvantage that a read request for
recently written data will create a “cache miss” and must be read from slower back-end storage and
experience higher latency.
Write-back cache: Under this scheme, data is written to cache alone, and completion is
immediately confirmed to the client. The write to the permanent storage is done after specified
intervals or under certain conditions. This results in low latency and high throughput for write-
intensive applications, however, this speed comes with the risk of data loss in case of a crash or
other adverse event because the only copy of the written data is in the cache.
1. Partitioning Methods
There are many different schemes one could use to decide how to break up an application database
into multiple smaller DBs. Below are three of the most popular schemes used by various large scale
applications.
a. Horizontal partitioning: In this scheme, we put different rows into different tables. For
example, if we are storing different places in a table, we can decide that locations with ZIP codes
less than 10000 are stored in one table, and places with ZIP codes greater than 10000 are stored in a
separate table. This is also called a range based sharding, as we are storing different ranges of data
in separate tables.
The key problem with this approach is that if the value whose range is used for sharding isn’t
chosen carefully, then the partitioning scheme will lead to unbalanced servers. In the previous
example, splitting location based on their zip codes assumes that places will be evenly distributed
across the different zip codes. This assumption is not valid as there will be a lot of places in a
thickly populated area like Manhattan compared to its suburb cities.
b. Vertical Partitioning: In this scheme, we divide our data to store tables related to a specific
feature to their own server. For example, if we are building Instagram like application, where we
need to store data related to users, all the photos they upload and people they follow, we can decide
to place user profile information on one DB server, friend lists on another and photos on a third
server.
Vertical partitioning is straightforward to implement and has a low impact on the application. The
main problem with this approach is that if our application experiences additional growth, then it
may be necessary to further partition a feature specific DB across various servers (e.g. it would not
be possible for a single server to handle all the metadata queries for 10 billion photos by 140 million
users).
c. Directory Based Partitioning: A loosely coupled approach to work around issues mentioned in
above schemes is to create a lookup service which knows your current partitioning scheme and
abstracts it away from the DB access code. So, to find out where does a particular data entity
resides, we query our directory server that holds the mapping between each tuple key to its DB
server. This loosely coupled approach means we can perform tasks like adding servers to the DB
pool or change our partitioning scheme without having to impact your application.
2. Partitioning Criteria
a. Key or Hash-based partitioning: Under this scheme, we apply a hash function to some key
attribute of the entity we are storing, that yields the partition number. For example, if we have 100
DB servers and our ID is a numeric value that gets incremented by one, each time a new record is
inserted. In this example, the hash function could be ‘ID % 100’, which will give us the server
number where we can store/read that record. This approach should ensure a uniform allocation of
data among servers. The fundamental problem with this approach is that it effectively fixes the total
number of DB servers, since adding new servers means changing the hash function which would
require redistribution of data and downtime for the service. A workaround for this problem is to use
Consistent Hashing.
b. List partitioning: In this scheme, each partition is assigned a list of values, so whenever we
want to insert a new record, we will see which partition contains our key and then store it there. For
example, we can decide all users living in Iceland, Norway, Sweden, Finland or Denmark will be
stored in a partition for the Nordic countries.
c. Round-robin partitioning: This is a very simple strategy that ensures uniform data distribution.
With ‘n’ partitions, the ‘i’ tuple is assigned to partition (i mod n).
d. Composite partitioning: Under this scheme, we combine any of above partitioning schemes to
devise a new scheme. For example, first applying a list partitioning and then a hash based
partitioning. Consistent hashing could be considered a composite of hash and list partitioning where
the hash reduces the key space to a size that can be listed.
Proxies are also extremely helpful when coordinating requests from multiple servers and can be
used to optimize request traffic from a system-wide perspective. For example, we can collapse the
same (or similar) data access requests into one request and then return the single result to the user;
this scheme is called collapsed forwarding.
Imagine there is a request for the same data across several nodes, and that piece of data is not in the
cache. If these requests are routed through the proxy, then all them can be collapsed into one, which
means we will be reading the required data from the disk only once.
Another great way to use the proxy is to collapse requests for data that is spatially close together in
the storage (consecutively on disk). This strategy will result in decreasing request latency. For
example, let’s say a bunch of servers request parts of file: part1, part2, part3, etc. We can set up our
proxy in such a way that it can recognize the spatial locality of the individual requests, thus
collapsing them into a single request and reading complete file, which will greatly minimize the
reads from the data origin. Such scheme makes a big difference in request time when we are doing
random accesses across TBs of data. Proxies are particularly useful under high load situations, or
when we have limited caching since proxies can mostly batch several requests into one.
Queues
Queues are used to effectively manage requests in a large-scale distributed system. In small systems
with minimal processing loads and small databases, writes can be predictably fast; however, in
more complex and large systems writes can take an almost non-deterministically long time. For
example, data may have to be written in different places on different servers or indices, or the
system could simply be under high load. In such cases where individual writes (or tasks) may take a
long time, achieving high performance and availability requires different components of the system
to work in an asynchronous way; a common way to do that is with queues.
Let’s assume a system where each client is requesting a task to be processed on a remote server.
Each of these clients sends their requests to the server, and the server tries to finish the tasks as
quickly as possible to return the results to the respective clients. In small systems where one server
can handle incoming requests just as fast as they come, this kind of situation should work just fine.
However, when the server gets more requests than it can handle, then each client is forced to wait
for other clients’ requests to finish before a response can be generated.
This kind of synchronous behavior can severely degrade client’s performance; the client is forced to
wait, effectively doing zero work, until its request can be responded. Adding extra servers to
address high load does not solve the problem either; even with effective load balancing in place, it
is very difficult to ensure the fair and balanced distribution of work required to maximize client
performance. Further, if the server processing the requests is unavailable, or fails, then the clients
upstream will fail too. Solving this problem effectively requires building an abstraction between the
client’s request and the actual work performed to service it.
A processing queue is as simple as it sounds: all incoming tasks are added to the queue, and as soon
as any worker has the capacity to process, they can pick up a task from the queue. These tasks could
represent a simple write to a database, or something as complex as generating a thumbnail preview
image for a document.
Queues are implemented on the asynchronous communication protocol, meaning when a client
submits a task to a queue they are no longer required to wait for the results; instead, they need only
acknowledgment that the request was properly received. This acknowledgment can later serve as a
reference for the results of the work when the client requires it. Queues have implicit or explicit
limits on the size of data that may be transmitted in a single request and the number of requests that
may remain outstanding on the queue.
Queues are also used for fault tolerance as they can provide some protection from service outages
and failures. For example, we can create a highly robust queue that can retry service requests that
have failed due to transient system failures. It is preferable to use a queue to enforce quality-of-
service guarantees than to expose clients directly to intermittent service outages, requiring
complicated and often inconsistent client-side error handling.
Queues play a vital role in managing distributed communication between different parts of any
large-scale distributed system. There are a lot of ways to implement them and quite a few open
source implementations of queues available like RabbitMQ, ZeroMQ, ActiveMQ, and BeanstalkD.
Redundancy and Replication
Redundancy means duplication of critical data or services with the intention of increased reliability
of the system. For example, if there is only one copy of a file stored on a single server, then losing
that server means losing the file. Since losing data is seldom a good thing, we can create duplicate
or redundant copies of the file to solve this problem.
This same principle applies to services too. If we have a critical service in our system, ensuring that
multiple copies or versions of it are running simultaneously can secure against the failure of a single
node.
Creating redundancy in a system can remove single points of failure and provide backups if needed
in a crisis. For example, if we have two instances of a service running in production, and if one fails
or degrades, the system can failover to the other one. These failovers can happen automatically or
can be done manually.
Another important part of service redundancy is to create a shared-nothing architecture, where each
node can operate independently of one another. There should not be any central service managing
state or orchestrating activities for the other nodes. This helps a lot with scalability since new
servers can be added without special conditions or knowledge and most importantly, such systems
are more resilient to failure as there is no single point of failure.
SQL vs. NoSQL
In the world of databases, there are two main types of solutions: SQL and NoSQL - or relational
databases and non-relational databases. Both of them differ in the way they were built, the kind of
information they store, and how they store it.
Relational databases are structured and have predefined schemas, like phone books that store phone
numbers and addresses. Non-relational databases are unstructured, distributed and have a dynamic
schema, like file folders that hold everything from a person’s address and phone number to their
Facebook ‘likes’ and online shopping preferences.
SQL
Relational databases store data in rows and columns. Each row contains all the information about
one entity, and columns are all the separate data points. Some of the most popular relational
databases are MySQL, Oracle, MS SQL Server, SQLite, Postgres, MariaDB, etc.
NoSQL
Following are most common types of NoSQL:
Key-Value Stores: Data is stored in an array of key-value pairs. The ‘key’ is an attribute name,
which is linked to a ‘value’. Well-known key value stores include Redis, Voldemort and Dynamo.
Document Databases: In these databases data is stored in documents, instead of rows and columns
in a table, and these documents are grouped together in collections. Each document can have an
entirely different structure. Document databases include the CouchDB and MongoDB.
Wide-Column Databases: Instead of ‘tables,’ in columnar databases we have column families,
which are containers for rows. Unlike relational databases, we don’t need to know all the columns
up front, and each row doesn’t have to have the same number of columns. Columnar databases are
best suited for analyzing large datasets - big names include Cassandra and HBase.
Graph Databases: These databases are used to store data whose relations are best represented in a
graph. Data is saved in graph structures with nodes (entities), properties (information about the
entities) and lines (connections between the entities). Examples of graph database include Neo4J
and InfiniteGraph.
HTTP Protocol
Ajax Polling
Polling is a standard technique used by the vast majority of AJAX applications. The basic idea is
that the client repeatedly polls (or requests) a server for data. The client makes a request and waits
for the server to respond with data. If no data is available, an empty response is returned.
1. Client opens a connection and requests data from the server using regular HTTP.
2. The requested webpage sends requests to the server at regular intervals (e.g., 0.5 seconds).
3. The server calculates the response and sends it back, just like regular HTTP traffic.
4. Client repeats the above three steps periodically to get updates from the server.
Problem with Polling is that the client has to keep asking the server for any new data. As a result, a
lot of responses are empty creating HTTP overhead.
Ajax Polling Protocol
HTTP Long-Polling
A variation of the traditional polling technique that allows the server to push information to a client,
whenever the data is available. With Long-Polling, the client requests information from the server
exactly as in normal polling, but with the expectation that the server may not respond immediately.
That’s why this technique is sometimes referred to as a “Hanging GET”.
• If the server does not have any data available for the client, instead of sending an empty
response, the server holds the request and waits until some data becomes available.
• Once the data becomes available, a full response is sent to the client. The client then
immediately re-request information from the server so that the server will almost always
have an available waiting request that it can use to deliver data in response to an event.
The basic life cycle of an application using HTTP Long-Polling is as follows:
1. The client makes an initial request using regular HTTP and then waits for a response.
2. The server delays its response until an update is available, or until a timeout has occurred.
3. When an update is available, the server sends a full response to the client.
4. The client typically sends a new long-poll request, either immediately upon receiving a
response or after a pause to allow an acceptable latency period.
5. Each Long-Poll request has a timeout. The client has to reconnect periodically after the
connection is closed, due to timeouts.
Long Polling Protocol
WebSockets
WebSocket provides Full duplex communication channels over a single TCP connection. It
provides a persistent connection between a client and a server that both parties can use to start
sending data at any time. The client establishes a WebSocket connection through a process known
as the WebSocket handshake. If the process succeeds, then the server and client can exchange data
in both directions at any time. The WebSocket protocol enables communication between a client
and a server with lower overheads, facilitating real-time data transfer from and to the server. This is
made possible by providing a standardized way for the server to send content to the browser without
being asked by the client, and allowing for messages to be passed back and forth while keeping the
connection [Link] this way, a two-way (bi-directional) ongoing conversation can take place
between a client and a server.
WebSockets Protocol
Server-Sent Events (SSEs)
Under SSEs the client establishes a persistent and long-term connection with the server. The server
uses this connection to send data to a client. If the client wants to send data to the server, it would
require the use of another technology/protocol to do so.
1. Client requests data from a server using regular HTTP.
2. The requested webpage opens a connection to the server.
3. The server sends the data to the client whenever there’s new information available.
SSEs are best when we need real-time traffic from the server to the client or if the server is
generating data in a loop and will be sending multiple events to the client.
• Will users of our service be able to post tweets and follow other people?
• Should we also design to create and display user’s timeline?
• Will tweets contain photos and videos?
• Are we focusing on backend only or are we developing front-end too?
• Will users be able to search tweets?
• Do we need to display hot trending topics?
• Would there be any push notification for new (or important) tweets?
All such question will determine how our end design will look like.
Step 2: System interface definition
Define what APIs are expected from the system. This would not only establish the exact contract
expected from the system but would also ensure if you haven’t gotten any requirements wrong.
Some examples for our Twitter-like service would be:
• What scale is expected from the system (e.g., number of new tweets, number of tweet views,
how many timeline generations per sec., etc.)?
• How much storage would we need? We’ll have different numbers if users can have photos
and videos in their tweets.
• What network bandwidth usage are we expecting? This would be crucial in deciding how
would we manage traffic and balance load between servers.
• Since we’ll be storing a huge amount of data, how should we partition our data to distribute
it to multiple databases? Should we try to store all the data of a user on the same database?
What issue can it cause?
• How would we handle hot users, who tweet a lot or follow lots of people?
• Since user’s timeline will contain most recent (and relevant) tweets, should we try to store
our data in such a way that is optimized to scan latest tweets?
• How much and at which layer should we introduce cache to speed things up?
• What components need better load balancing?
• Is there any single point of failure in our system? What are we doing to mitigate it?
• Do we’ve enough replicas of the data so that if we lose a few servers, we can still serve our
users?
• Similarly, do we’ve enough copies of different services running, such that a few failures will
not cause total system shutdown?
• How are we monitoring the performance of our service? Do we get alerts whenever critical
components fail or their performance degrade?
In summary, preparation and being organized during the interview are the keys to be successful in
system design interviews.
Designing a URL Shortening service like
TinyURL
Let's design a URL shortening service like TinyURL. This service will provide short aliases
redirecting to long URLs.
Similar services: [Link], [Link], [Link] etc. Difficulty Level: Easy
We would get:
[Link]
The shortened URL is nearly 1/3rd of the size of the actual URL.
URL shortening is used for optimizing links across devices, tracking individual links to analyze
audience and campaign performance, and hiding affiliated original URLs, etc.
If you haven’t used [Link] before, please try creating a new shortened URL and spend some
time going through different options their service offers. This will help you a lot in understanding
this chapter better.
4. System APIs
💡 Once we've finalized the requirements, it's always a good idea to define the system APIs. This
would explicitly state what is expected from the system.
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definitions of the APIs for creating and deleting URLs:
creatURL(api_dev_key, original_url, custom_alias=None user_name=None,
expire_date=None)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
original_url (string) : Original URL to be shortened.
custom_alias (string) : Optional custom key for the URL.
user_name (string) : Optional user name to be used in encoding.
expire_date (string) : Optional expiration date for the shortened URL.
Returns : (string)
A successful insertion returns the shortened URL, otherwise, returns an error code.
deleteURL(api_dev_key, url_key)
Where “url_key” is a string representing the shortened URL to be retrieved. A successful deletion
returns ‘URL Removed’.
How do we detect and prevent abuse? For instance, any service can put us out of business by
consuming all our keys in the current design. To prevent abuse, we can limit users through their
api_dev_key, how many URL they can create or access in a certain time.
5. Database Design
💡 Defining the DB schema in the early stages of the interview would help to understand the
data flow among various components and later would guide towards the data partitioning.
A few observations about nature of the data we are going to store:
1. We need to store billions of records.
2. Each object we are going to store is small (less than 1K).
3. There are no relationships between records, except if we want to store which user created
what URL.
4. Our service is read-heavy.
Database Schema:
We would need two tables, one for storing information about the URL mappings and the other for
users’ data.
What kind of database should we use? Since we are likely going to store billions of rows and we
don’t need to use relationships between objects – a NoSQL key-value store like Dynamo or
Cassandra is a better choice, which would also be easier to scale. Please see SQL vs NoSQL for
more details. If we choose NoSQL, we cannot store UserID in the URL table (as there are no
foreign keys in NoSQL), for that we would need a third table which will store the mapping between
URL and the user.
6. Basic System Design and Algorithm
The problem we are solving here is to generate a short and unique key for the given URL. In the
above-mentioned example, the shortened URL we got was: “[Link] the last six
characters of this URL is the short key we want to generate. We’ll explore two solutions here:
8. Cache
We can cache URLs that are frequently accessed. We can use some off-the-shelf solution like
Memcache, that can store full URLs with their respective hashes. The application servers, before
hitting backend storage, can quickly check if the cache has desired URL.
How much cache should we have? We can start with 20% of daily traffic and based on clients’
usage pattern we can adjust how many cache servers we need. As estimated above we need 170GB
memory to cache 20% of daily traffic since a modern day server can have 256GB memory, we can
easily fit all the cache into one machine, or we can choose to use a couple of smaller servers to store
all these hot URLs.
Which cache eviction policy would best fit our needs? When the cache is full, and we want to
replace a link with a newer/hotter URL, how would we choose? Least Recently Used (LRU) can be
a reasonable policy for our system. Under this policy, we discard the least recently used URL first.
We can use a Linked Hash Map or a similar data structure to store our URLs and Hashes, which
will also keep track of which URLs are accessed recently.
To further increase the efficiency, we can replicate our caching servers to distribute load between
them.
How can each cache replica be updated? Whenever there is a cache miss, our servers would be
hitting backend database. Whenever this happens, we can update the cache and pass the new entry
to all the cache replicas. Each replica can update their cache by adding the new entry. If a replica
already has that entry, it can simply ignore it.
• Whenever a user tries to access an expired link, we can delete the link and return an error to
the user.
• A separate Cleanup service can run periodically to remove expired links from our storage
and cache. This service should be very lightweight and can be scheduled to run only when
the user traffic is expected to be low.
• We can have a default expiration time for each link, e.g., two years.
• After removing an expired link, we can put the key back in the key-DB to be reused.
• Should we remove links that haven’t been visited in some length of time, say six months?
This could be tricky. Since storage is getting cheap, we can decide to keep links forever.
11. Telemetry
How many times a short URL has been used, what were user locations, etc.? How would we store
these statistics? If it is part of a DB row that gets updated on each view, what will happen when a
popular URL is slammed with a large number of concurrent requests?
We can have statistics about the country of the visitor, date and time of access, web page that refers
the click, browser or platform from where the page was accessed and more.
1. What is Pastebin?
Pastebin like services enable users to store plain text or images over the network (typically the
Internet) and generate unique URLs to access the uploaded data. Such services are also used to
share data over the network quickly, as users would just need to pass the URL to let other users see
it.
If you haven’t used [Link] before, please try creating a new ‘Paste’ there and spend some
time going through different options their service offers. This will help you a lot in understanding
this chapter better.
5. System APIs
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definitions of the APIs to create/retrieve/delete Pastes:
addPaste(api_dev_key, paste_data, custom_url=None user_name=None,
paste_name=None, expire_date=None)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
paste_data (string) : Textual data of the paste.
custom_url (string) : Optional custom URL.
user_name (string) : Optional user name to be used to generate URL.
paste_name (string) : Optional name of the paste expire_date (string): Optional expiration date for
the paste.
Returns : (string)
A successful insertion returns the URL through which the paste can be accessed, otherwise, returns
an error code.
Similarly, we can have retrieve and delete Paste APIs:
getPaste(api_dev_key, api_paste_key)
Where “api_paste_key” is a string representing the Paste Key of the paste to be retrieved. This API
will return the textual data of the paste.
deletePaste(api_dev_key, api_paste_key)
6. Database Design
A few observations about nature of the data we are going to store:
1. We need to store billions of records.
2. Each metadata object we are going to store would be small (less than 100 bytes).
3. Each paste object we are storing can be of medium size (it can be a few MB).
4. There are no relationships between records, except if we want to store which user created
what Paste.
5. Our service is read heavy.
Database Schema:
We would need two tables, one for storing information about the Pastes and the other for users’
data.
High
level
desig
n for
Paste
bin
8.
Co
mpo
nent
Design
a. Application layer
Our application layer will process all incoming and outgoing requests. The application servers will
be talking to the backend data store components to serve the requests.
How to handle a write request? Upon receiving a write request, our application server will
generate a six-letter random string, which would serve as the key of the paste (if the user has not
provided a custom key). The application server will then store the contents of the paste and the
generated key in the database. After the successful insertion, the server can return the key to the
user. One possible problem here could be that the insertion fails because of a duplicate key. Since
we are generating a random key, there is a possibility that the newly generated key could match an
existing one. In that case, we should regenerate a new key and try again. We should keep retrying
until we don’t see a failure due to the duplicate key. We should return an error to the user if the
custom key they have provided is already present in our database.
Another solution of the above problem could be to run a standalone Key Generation Service
(KGS) that generates random six letters strings beforehand and stores them in a database (let’s call
it key-DB). Whenever we want to store a new paste, we will just take one of the already generated
keys and use it. This approach will make things quite simple and fast since we will not be worrying
about duplications or collisions. KGS will make sure all the keys inserted in key-DB are unique.
KGS can use two tables to store keys, one for keys that are not used yet and one for all the used
keys. As soon as KGS give some keys to any application server, it can move these to the used keys
table. KGS can always keep some keys in memory so that whenever a server needs them, it can
quickly provide them. As soon as KGS loads some keys in memory, it can move them to used keys
table, this way we can make sure each server gets unique keys. If KGS dies before using all the
keys loaded in memory, we will be wasting those keys. We can ignore these keys given a huge
number of keys we have.
Isn’t KGS single point of failure? Yes, it is. To solve this, we can have a standby replica of KGS,
and whenever the primary server dies, it can take over to generate and provide keys.
Can each app server cache some keys from key-DB? Yes, this can surely speed things up.
Although in this case, if the application server dies before consuming all the keys, we will end up
losing those keys. This could be acceptable since we have 68B unique six letters keys, which are a
lot more than we require.
How to handle a paste read request? Upon receiving a read paste request, the application service
layer contacts the datastore. The datastore searches for the key, and if it is found, returns the paste’s
contents. Otherwise, an error code is returned.
b. Datastore layer
We can divide our datastore layer into two:
1. Metadata database: We can use a relational database like MySQL or a Distributed Key-
Value store like Dynamo or Cassandra.
2. Block storage: We can store our contents in a block storage that could be a distributed file
storage or an SQL-like database. Whenever we feel like hitting our full capacity on content
storage, we can easily increase it by adding more servers.
Detailed component design for Pastebin
9. Purging or DB Cleanup
Please see Designing a URL Shortening service.
Let's design a photo-sharing service like Instagram, where users can upload photos to share them
with other users. Similar Services: Flickr, Picasa Difficulty Level: Medium
1. Why Instagram?
Instagram is a social networking service, which enables its users to upload and share their pictures
and videos with other users. Users can share either publicly or privately, as well as through a
number of other social networking platforms, such as Facebook, Twitter, Flickr, and Tumblr.
For the sake of this exercise, we plan to design a simpler version of Instagram, where a user can
share photos and can also follow other users. Timeline for each user will consist of top photos from
all the people the user follows.
6. Database Schema
💡 Defining the DB schema in the early stages of the interview would help to understand the data
flow among various components and later would guide towards the data partitioning.
We need to store data about users, their uploaded photos, and people they follow. Photo table will
store all data related to a photo, we need to have an index on (PhotoID, CreationDate) since we
need to fetch recent photos first.
One simple approach for storing the above schema would be to use an RDBMS like MySQL since
we require joins. But relational databases come with their challenges, especially when we need to
scale them. For details, please take a look at SQL vs. NoSQL.
We can store photos in a distributed file storage like HDFS or S3.
We can store the above schema in a distributed key-value store to enjoy benefits offered by NoSQL.
All the metadata related to photos can go to a table, where the ‘key’ would be the ‘PhotoID’ and the
‘value’ would be an object containing PhotoLocation, UserLocation, CreationTimestamp, etc.
We also need to store relationships between users and photos, to know who owns which photo.
Another relationship we would need to store is the list of people a user follows. For both of these
tables, we can use a wide-column datastore like Cassandra. For the ‘UserPhoto’ table, the ‘key’
would be ‘UserID’ and the ‘value’ would be the list of ‘PhotoIDs’ the user owns, stored in different
columns. We will have a similar scheme for the ‘UserFollow’ table.
Cassandra or key-value stores in general, always maintain a certain number of replicas to offer
reliability. Also, in such data stores, deletes don’t get applied instantly, data is retained for certain
days (to support undeleting) before getting removed from the system permanently.
7. Component Design
Writes or photo uploads could be slow as they have to go to the disk, whereas reads could be faster
if they are being served from cache.
Uploading users can consume all the connections, as uploading would be a slower process. This
means reads cannot be served if the system gets busy with all the write requests. To handle this
bottleneck we can split out read and writes into separate services.
Since most of the web servers have connection limit, we should keep this thing in mind before
designing our system. Synchronous connection for uploads, but downloads can be asynchronous.
Let’s assume if a web server can have maximum 500 connections at any time, and it can’t have
more than 500 concurrent uploads simultaneously. Since reads can be asynchronous, the web server
can serve a lot more than 500 users at any time, as it can switch between users quickly. This guides
us to have separate dedicated servers for reads and writes so that uploads don’t hog the system.
Separating image read and write requests will also allow us to scale or optimize each of them
independently.
KeyGeneratingServer2:
auto-increment-increment = 2
auto-increment-offset = 2
We can put a load balancer in front of both of these databases to round robin between them and to
deal with down time. Both these servers could be out of sync with one generating more keys than
the other, but this will not cause any issue in our system. We can extend this design by defining
separate ID tables for Users, Photo-Comments or other objects present in our system.
Alternately, we can implement a key generation scheme similar to what we have discussed in
Designing a URL Shortening service like TinyURL.
How can we plan for future growth of our system? We can have a large number of logical
partitions to accommodate future data growth, such that, in the beginning, multiple logical partitions
reside on a single physical database server. Since each database server can have multiple database
instances on it, we can have separate databases for each logical partition on any server. So
whenever we feel that a certain database server has a lot of data, we can migrate some logical
partitions from it to another server. We can maintain a config file (or a separate database) that can
map our logical partitions to database servers; this will enable us to move partitions around easily.
Whenever we want to move a partition, we just have to update the config file to announce the
change.
• The system should support snapshotting of the data, so that users can go back to any version
of the files.
6. Component Design
Let’s go through the major components of our system one by one:
a. Client
The Client Application monitors the workspace folder on user’s machine and syncs all files/folders
in it with the remote Cloud Storage. The client application will work with the storage servers to
upload, download and modify actual files to backend Cloud Storage. The client also interacts with
the remote Synchronization Service to handle any file metadata updates e.g. change in the file
name, size, modification date, etc.
Here are some of the essential operations of the client:
1. Upload and download files.
2. Detect file changes in the workspace folder.
3. Handle conflict due to offline or concurrent updates.
How do we handle file transfer efficiently? As mentioned above, we can break each file into
smaller chunks so that we transfer only those chunks that are modified and not the whole file. Let’s
say we divide each file into fixed size of 4MB chunks. We can statically calculate what could be an
optimal chunk size based on 1) Storage devices we use in the cloud to optimize space utilization
and Input/output operations per second (IOPS) 2) Network bandwidth 3) Average file size in the
storage etc. In our metadata, we should also keep a record of each file and the chunks that constitute
it.
Should we keep a copy of metadata with Client? Keeping a local copy of metadata not only
enable us to do offline updates but also saves a lot of round trips to update remote metadata.
How can clients efficiently listen to changes happening on other clients? One solution could be
that the clients periodically check with the server if there are any changes. The problem with this
approach is that we will have a delay in reflecting changes locally as clients will be checking for
changes periodically compared to server notifying whenever there is some change. If the client
frequently checks the server for changes, it will not only be wasting bandwidth, as the server has to
return empty response most of the time but will also be keeping the server busy. Pulling information
in this manner is not scalable too.
A solution to above problem could be to use HTTP long polling. With long polling, the client
requests information from the server with the expectation that the server may not respond
immediately. If the server has no new data for the client when the poll is received, instead of
sending an empty response, the server holds the request open and waits for response information to
become available. Once it does have new information, the server immediately sends an HTTP/S
response to the client, completing the open HTTP/S Request. Upon receipt of the server response,
the client can immediately issue another server request for future updates.
Based on the above considerations we can divide our client into following four parts:
I. Internal Metadata Database will keep track of all the files, chunks, their versions, and their
location in the file system.
II. Chunker will split the files into smaller pieces called chunks. It will also be responsible for
reconstructing a file from its chunks. Our chunking algorithm will detect the parts of the files that
have been modified by the user and only transfer those parts to the Cloud Storage; this will save us
bandwidth and synchronization time.
III. Watcher will monitor the local workspace folders and notify the Indexer (discussed below) of
any action performed by the users, e.g., when users create, delete, or update files or folders.
Watcher also listens to any changes happening on other clients that are broadcasted by
Synchronization service.
IV. Indexer will process the events received from the Watcher and update the internal metadata
database with information about the chunks of the modified files. Once the chunks are successfully
submitted/downloaded to the Cloud Storage, the Indexer will communicate with the remote
Synchronization Service to broadcast changes to other clients and update remote metadata database.
How should clients handle slow servers? Clients should exponentially back-off if the server is
busy/not-responding. Meaning, if a server is too slow to respond, clients should delay their retries,
and this delay should increase exponentially.
Should mobile clients sync remote changes immediately? Unlike desktop or web clients, that
check for file changes on a regular basis, mobile clients usually sync on demand to save user’s
bandwidth and space.
b. Metadata Database
The Metadata Database is responsible for maintaining the versioning and metadata information
about files/chunks, users, and workspaces. The Metadata Database can be a relational database such
as MySQL, or a NoSQL database service such as DynamoDB. Regardless of the type of the
database, the Synchronization Service should be able to provide a consistent view of the files using
a database, especially if more than one user work with the same file simultaneously. Since NoSQL
data stores do not support ACID properties in favor of scalability and performance, we need to
incorporate the support for ACID properties programmatically in the logic of our Synchronization
Service in case we opt for this kind of databases. However, using a relational database can simplify
the implementation of the Synchronization Service as they natively support ACID properties.
Metadata Database should be storing information about following objects:
1. Chunks
2. Files
3. User
4. Devices
5. Workspace (sync folders)
c. Synchronization Service
The Synchronization Service is the component that processes file updates made by a client and
applies these changes to other subscribed clients. It also synchronizes clients’ local databases with
the information stored in the remote Metadata DB. The Synchronization Service is the most
important part of the system architecture due to its critical role in managing the metadata and
synchronizing users’ files. Desktop clients communicate with the Synchronization Service to either
obtain updates from the Cloud Storage or send files and updates to the Cloud Storage and
potentially other users. If a client was offline for a period, it polls the system for new updates as
soon as it becomes online. When the Synchronization Service receives an update request, it checks
with the Metadata Database for consistency and then proceeds with the update. Subsequently, a
notification is sent to all subscribed users or devices to report the file update.
The Synchronization Service should be designed in such a way to transmit less data between clients
and the Cloud Storage to achieve better response time. To meet this design goal, the
Synchronization Service can employ a differencing algorithm to reduce the amount of the data that
needs to be synchronized. Instead of transmitting entire files from clients to the server or vice versa,
we can just transmit the difference between two versions of a file. Therefore, only the part of the
file that has been changed is transmitted. This also decreases bandwidth consumption and cloud
data storage for the end user. As described above we will be dividing our files into 4MB chunks and
will be transferring modified chunks only. Server and clients can calculate a hash (e.g., SHA-256)
to see whether to update the local copy of a chunk or not. On server if we already have a chunk with
a similar hash (even from another user) we don’t need to create another copy, we can use the same
chunk. This is discussed in detail later under Data Deduplication.
To be able to provide an efficient and scalable synchronization protocol we can consider using a
communication middleware between clients and the Synchronization Service. The messaging
middleware should provide scalable message queuing and change notification to support a high
number of clients using pull or push strategies. This way, multiple Synchronization Service
instances can receive requests from a global request Queue, and the communication middleware
will be able to balance their load.
e. Cloud/Block Storage
Cloud/Block Storage stores chunks of files uploaded by the users. Clients directly interact with the
storage to send and receive objects from it. Separation of the metadata from storage enables us to
use any storage either in cloud or in-house.
10. Caching
We can have two kinds of caches in our system. To deal with hot files/chunks, we can introduce a
cache for Block storage. We can use an off-the-shelf solution like Memcache, that can store whole
chunks with their respective IDs/Hashes, and Block servers before hitting Block storage can quickly
check if the cache has desired chunk. Based on clients’ usage pattern we can determine how many
cache servers we need. A high-end commercial server can have up to 144GB of memory; So, one
such server can cache 36K chunks.
Which cache replacement policy would best fit our needs? When the cache is full, and we want
to replace a chunk with a newer/hotter chunk, how would we choose? Least Recently Used (LRU)
can be a reasonable policy for our system. Under this policy, we discard the least recently used
chunk first.
Similarly, we can have a cache for Metadata DB.
• Group Chats: Messenger should support multiple people talking to each other in a group.
• Push notifications: Messenger should be able to notify users of new messages when they are
offline.
a. Messages Handling
How would we efficiently send/receive messages? To send messages, a user needs to connect to
the server and post messages for the other users. To get a message from the server, the user has two
options:
1. Pull model: Users can periodically ask the server if there are any new messages for them.
2. Push model: Users can keep a connection open with the server and can depend upon the
server to notify them whenever there are new messages.
If we go with our first approach, then the server needs to keep track of messages that are still
waiting to be delivered, and as soon as the receiving user connects to the server to ask for any new
message, the server can return all the pending messages. To minimize latency for the user, they
have to check the server quite frequently, and most of the time they will be getting an empty
response if there are no pending message. This will waste a lot of resources and does not look like
an efficient solution.
If we go with our second approach, where all the active users keep a connection open with the
server, then as soon as the server receives a message it can immediately pass the message to the
intended user. This way, the server does not need to keep track of pending messages, and we will
have minimum latency, as the messages are delivery instantly on the opened connection.
How will clients maintain an open connection with the server? We can use HTTP Long Polling.
In long polling, clients can request information from the server with the expectation that the server
may not respond immediately. If the server has no new data for the client when the poll is received,
instead of sending an empty response, the server holds the request open and waits for response
information to become available. Once it does have new information, the server immediately sends
the response to the client, completing the open request. Upon receipt of the server response, the
client can immediately issue another server request for future updates. This gives a lot of
improvements in latencies, throughputs, and performance. The long polling request can timeout or
can receive a disconnect from the server, in that case, the client has to open a new request.
How can server keep track of all opened connection to efficiently redirect messages to the
users? The server can maintain a hash table, where “key” would be the UserID and “value” would
be the connection object. So whenever the server receives a message for a user, it looks up that user
in the hash table to find the connection object and sends the message on the open request.
What will happen when the server receives a message for a user who has gone offline? If the
receiver has disconnected, the server can notify the sender about the delivery failure. If it is a
temporary disconnect, e.g., the receiver’s long-poll request just timed out, then we should expect a
reconnect from the user. In that case, we can ask the sender to retry sending the message. This retry
could be embedded in the client’s logic so that users don’t have to retype the message. The server
can also store the message for a while and retry sending it once the receiver reconnects.
How many chat servers we need? Let’s plan for 500 million connections at any time. Assuming a
modern server can handle 50K concurrent connections at any time, we would need 10K such
servers.
How to know which server holds the connection to which user? We can introduce a software
load balancer in front of our chat servers; that can map each UserID to a server to redirect the
request.
How should the server process a ‘deliver message’ request? The server needs to do following
things upon receiving a new message 1) Store the message in the database 2) Send the message to
the receiver 3) Send an acknowledgment to the sender.
The chat server will first find the server that holds the connection for the receiver and pass the
message to that server to send it to the receiver. The chat server can then send the acknowledgment
to the sender; we don’t need to wait for storing the message in the database; this can happen in the
background. Storing the message is discussed in the next section.
How does the messenger maintain the sequencing of the messages? We can store a timestamp
with each message, which would be the time when the message is received at the server. But this
will still not ensure correct ordering of messages for clients. The scenario where the server
timestamp cannot determine the exact ordering of messages would look like this:
1. User-1 sends a message M1 to the server for User-2.
2. The server receives M1 at T1.
3. Meanwhile, User-2 sends a message M2 to the server for User-1.
4. The server receives the message M2 at T2, such that T2 > T1.
5. The server sends message M1 to User-2 and M2 to User-1.
So User-1 will see M1 first and then M2, whereas User-2 will see M2 first and then M1.
To resolve this, we need to keep a sequence number with every message for each client. This
sequence number will determine the exact ordering of messages for EACH user. With this solution,
both clients will see a different view of message sequence, but this view will be consistent for them
on all devices.
6. Data partitioning
Since we will be storing a lot of data (3.6PB for five years), we need to distribute it onto multiple
database servers. What would be our partitioning scheme?
Partitioning based on UserID: Let’s assume we partition based on the hash of the UserID, so that
we can keep all messages of a user on the same database. If one DB shard is 4TB, we will have
“3.6PB/4TB ~= 900” shards for five years. For simplicity, let’s assume we keep 1K shards. So we
will find the shard number by “hash(UserID) % 1000”, and then store/retrieve the data from there.
This partitioning scheme will also be very quick to fetch chat history for any user.
In the beginning, we can start with fewer database servers with multiple shards residing on one
physical server. Since we can have multiple database instances on a server, we can easily store
multiple partitions on a single server. Our hash function needs to understand this logical partitioning
scheme so that it can map multiple logical partitions on one physical server.
Since we will store an infinite history of messages, we can start with a big number of logical
partitions, which would be mapped to fewer physical servers, and as our storage demand increases,
we can add more physical servers to distribute our logical partitions.
Partitioning based on MessageID: If we store different messages of a user on separate database
shard, fetching a range of messages of a chat would be very slow, so we should not adopt this
scheme.
7. Cache
We can cache a few recent messages (say last 15) in a few recent conversations that are visible in
user’s viewport (say last 5). Since we decided to store all of the user’s messages on one shard, cache
for a user should completely reside on one machine too.
8. Load balancing
We will need a load balancer in front of our chat servers; that can map each UserID to a server that
holds the connection for the user and then direct the request to that server. Similarly, we would need
a load balancer for our cache servers.
1. What is Twitter?
Twitter is an online social networking service where users post and read short 140-character
messages called "tweets". Registered users can post and read tweets, but those who are not
registered can only read them. Users access Twitter through their website interface, SMS or mobile
app.
4. System APIs
💡 Once we've finalized the requirements, it's always a good idea to define the system APIs. This
would explicitly state what is expected from the system.
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definition of the API for posting a new tweet:
tweet(api_dev_key, tweet_data, tweet_location, user_location, media_ids,
maximum_results_to_return)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
tweet_data (string) : The text of the tweet, typically up to 140 characters.
tweet_location (string) : Optional location (longitude, latitude) this Tweet refers to.
user_location (string) : Optional location (longitude, latitude) of the user adding the tweet.
media_ids (number[]) : Optional list of media_ids to be associated with the Tweet. (All the media
photo, video, etc.) need to be uploaded separately.
Returns : (string)
A successful post will return the URL to access that tweet. Otherwise, an appropriate HTTP error is
returned.
Although our expected daily write load is 100 million and read load is 28 billion tweets. This
means, on average our system will receive around 1160 new tweets and 325K read requests per
second. This traffic will be distributed unevenly throughout the day, though, at peak time we should
expect at least a few thousand write requests and around 1M read requests per second. We should
keep this thing in mind while designing the architecture of our system.
6. Database Schema
We need to store data about users, their tweets, their favorite tweets, and people they follow.
For choosing between SQL and NoSQL databases to store the above schema, please see ‘Database
schema’ under Designing Instagram.
7. Data Sharding
Since we have a huge number of new tweets every day and our read load is extremely high too, we
need to distribute our data onto multiple machines such that we can read/write it efficiently. We
have many options to shard our data; let’s go through them one by one:
Sharding based on UserID: We can try storing all the data of a user on one server. While storing,
we can pass the UserID to our hash function that will map the user to a database server where we
will store all of the user’s tweets, favorites, follows, etc. While querying for
tweets/follows/favorites of a user, we can ask our hash function where can we find the data of a user
and then read it from there. This approach has a couple of issues:
1. What if a user becomes hot? There could be a lot of queries on the server holding the user.
This high load will affect the performance of our service.
2. Over time some users can end up storing a lot of tweets or have a lot of follows compared to
others. Maintaining a uniform distribution of growing user’s data is quite difficult.
To recover from these situations either we have to repartition/redistribute our data or use consistent
hashing.
Sharding based on TweetID: Our hash function will map each TweetID to a random server where
we will store that Tweet. To search tweets, we have to query all servers, and each server will return
a set of tweets. A centralized server will aggregate these results to return them to the user. Let’s
look into timeline generation example, here are the number of steps our system has to perform to
generate a user’s timeline:
1. Our application (app) server will find all the people the user follows.
2. App server will send the query to all database servers to find tweets from these people.
3. Each database server will find the tweets for each user, sort them by recency and return the
top tweets.
4. App server will merge all the results and sort them again to return the top results to the user.
This approach solves the problem of hot users, but in contrast to sharding by UserID, we have to
query all database partitions to find tweets of a user, which can result in higher latencies.
We can further improve our performance by introducing cache to store hot tweets in front of the
database servers.
Sharding based on Tweet creation time: Storing tweets based on recency will give us the
advantage of fetching all the top tweets quickly, and we only have to query a very small set of
servers. But the problem here is that the traffic load will not be distributed, e.g., while writing, all
new tweets will be going to one server, and the remaining servers will be sitting idle. Similarly
while reading, the server holding latest data will have a very high load as compared to servers
holding old data.
What if we can combine sharding by TweedID and Tweet creation time? If we don’t store
tweet creation time separately and use TweetID to reflect that, we can get benefits of both the
approaches. This way it will be quite quick to find latest Tweets. For this, we must make each
TweetID universally unique in our system, and each TweetID should contain timestamp too.
We can use epoch time for this. Let’s say our TweetID will have two parts; the first part will be
representing epoch seconds and the second part will be an auto-incrementing sequence. So, to make
a new TweetID, we can take the current epoch time and append an auto-incrementing number to it.
We can figure out shard number from this TweetID and store it there.
What could be the size of our TweetID? Let’s say our epoch time starts today, how many bits we
would need to store the number of seconds for next 50 years?
86400 sec/day * 365 (days a year) * 50 (years) => 1.6B
We would need 31 bits to store this number. Since on average we are expecting 1150 new tweets
per second, we can allocate 17 bits to store auto incremented sequence; this will make our TweetID
48 bits long. So, every second we can store (2^17 => 130K) new tweets. We can reset our auto
incrementing sequence every second. For fault tolerance and better performance, we can have two
database servers to generate auto-incrementing keys for us, one generating even numbered keys and
the other generating odd numbered keys.
If we assume our current epoch seconds are “1483228800”, our TweetID will look like this:
1483228800 000001
1483228800 000002
1483228800 000003
1483228800 000004
…
If we make our TweetID 64bits (8 bytes) long, we can easily store tweets for next 100 years and
also store them for mili-seconds granularity.
8. Cache
We can introduce a cache for database servers to cache hot tweets and users. We can use an off-the-
shelf solution like Memcache that can store the whole tweet objects. Application servers before
hitting database can quickly check if the cache has desired tweets. Based on clients’ usage pattern
we can determine how many cache servers we need.
Which cache replacement policy would best fit our needs? When the cache is full, and we want
to replace a tweet with a newer/hotter tweet, how would we choose? Least Recently Used (LRU)
can be a reasonable policy for our system. Under this policy, we discard the least recently viewed
tweet first.
How can we have more intelligent cache? If we go with 80-20 rule, that is 20% of tweets are
generating 80% of read traffic which means that certain tweets are so popular that majority of
people read them. This dictates that we can try to cache 20% of daily read volume from each shard.
What if we cache the latest data? Our service can benefit from this approach. Let’s say if 80% of
our users see tweets from past three days only; we can try to cache all the tweets from past three
days. Let’s say we have dedicated cache servers that cache all the tweets from all users from past
three days. As estimated above, we are getting 100 million new tweets or 30GB of new data every
day (without photos and videos). If we want to store all the tweets from last three days, we would
need less than 100GB of memory. This data can easily fit into one server, but we should replicate it
onto multiple servers to distribute all the read traffic to reduce the load on cache servers. So
whenever we are generating a user’s timeline, we can ask the cache servers if they have all the
recent tweets for that user, if yes, we can simply return all the data from the cache. If we don’t have
enough tweets in the cache, we have to query backend to fetch that data. On a similar design, we
can try caching photos and videos from last three days.
Our cache would be like a hash table, where ‘key’ would be ‘OwnerID’ and ‘value’ would be a
doubly linked list containing all the tweets from that user in past three days. Since we want to
retrieve most recent data first, we can always insert new tweets at the head of the linked list, which
means all the older tweets will be near the tail of the linked list. Therefore, we can remove tweets
from the tail to make space for newer tweets.
9. Timeline Generation
For a detailed discussion about timeline generation, take a look at Designing Facebook’s Newsfeed.
12. Monitoring
Having the ability to monitor our systems is crucial. We should constantly collect data to get an
instant insight into how our system is doing. We can collect following metrics/counters to get an
understanding of the performance of our service:
1. New tweets per day/second, what is the daily peak?
2. Timeline delivery stats, how many tweets per day/second our service is delivering.
3. Average latency that is seen by the user to refresh timeline.
By monitoring these counters, we will realize if we need more replication or load balancing or
caching, etc.
1. Why Youtube?
Youtube is one of the most popular video sharing websites in the world. Users of the service can
upload, view, share, rate, and report videos as well as add comments on videos.
4. System APIs
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definitions of the APIs for uploading and searching videos:
uploadVideo(api_dev_key, video_title, vide_description, tags[], category_id,
default_language, recording_details, video_contents)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
video_title (string) : Title of the video.
vide_description (string) : Optional description of the video.
tags (string[]) : Optional tags for the video.
category_id (string) : Category of the video, e.g., Film, Song, People, etc.
default_language (string) : For example English, Mandarin, Hindi, etc.
recording_details (string) : Location where the video was recorded.
video_contents (stream) : Video to be uploaded.
Returns : (string)
A successful upload will return HTTP 202 (request accepted), and once the video encoding is
completed, the user is notified through email with a link to access the video. We can also expose a
queryable API to let users know the current status of their uploaded video.
searchVideo(api_dev_key, search_query, user_location, maximum_videos_to_return,
page_token)
Parameters:
api_dev_key (string) : The API developer key of a registered account of our service.
search_query (string) : A string containing the search terms.
user_location (string) : Optional location of the user performing the search.
maximum_videos_to_return (number) : Maximum number of results returned in one request.
page_token (string) : This token will specify a page in the result set that should be returned.
Returns : (JSON)
A JSON containing information about the list of video resources matching the search query. Each
video resource will have a video title, a thumbnail, a video creation date and how many views it
has.
6. Database Schema
Video metadata storage - MySql
Videos metadata can be stored in a SQL database. Following information should be stored with
each video:
• VideoID
• Title
• Description
• Size
• Thumbnail
• Uploader/User
• Total number of likes
• Total number of dislikes
• Total number of views
For each video comment, we need to store following information:
• CommentID
• VideoID
• UserID
• Comment
• TimeOfCreation
User data storage - MySql
8. Metadata Sharding
Since we have a huge number of new videos every day and our read load is extremely high too, we
need to distribute our data onto multiple machines so that we can perform read/write operations
efficiently. We have many options to shard our data. Let’s go through different strategies of
sharding this data one by one:
Sharding based on UserID: We can try storing all the data for a particular user on one server.
While storing, we can pass the UserID to our hash function which will map the user to a database
server where we will store all the metadata for that user’s videos. While querying for videos of a
user, we can ask our hash function to find the server holding user’s data and then read it from there.
To search videos by titles, we will have to query all servers, and each server will return a set of
videos. A centralized server will then aggregate and rank these results before returning them to the
user.
This approach has a couple of issues:
1. What if a user becomes popular? There could be a lot of queries on the server holding that
user, creating a performance bottleneck. This will affect the overall performance of our
service.
2. Over time, some users can end up storing a lot of videos compared to others. Maintaining a
uniform distribution of growing user’s data is quite difficult.
To recover from these situations either we have to repartition/redistribute our data or used
consistent hashing to balance the load between servers.
Sharding based on VideoID: Our hash function will map each VideoID to a random server where
we will store that Video’s metadata. To find videos of a user we will query all servers, and each
server will return a set of videos. A centralized server will aggregate and rank these results before
returning them to the user. This approach solves our problem of popular users but shifts it to
popular videos.
We can further improve our performance by introducing cache to store hot videos in front of the
database servers.
9. Video Deduplication
With a huge number of users, uploading a massive amount of video data, our service will have to
deal with widespread video duplication. Duplicate videos often differ in aspect ratios or encodings,
can contain overlays or additional borders, or can be excerpts from a longer, original video. The
proliferation of duplicate videos can have an impact on many levels:
1. Data Storage: We could be wasting storage space by keeping multiple copies of the same
video.
2. Caching: Duplicate videos would result in degraded cache efficiency by taking up space that
could be used for unique content.
3. Network usage: Increasing the amount of data that must be sent over the network to in-
network caching systems.
4. Energy consumption: Higher storage, inefficient cache, and network usage will result in
energy wastage.
For the end user, these inefficiencies will be realized in the form of duplicate search results, longer
video startup times, and interrupted streaming.
For our service, deduplication makes most sense early, when a user is uploading a video; as
compared to post-processing it to find duplicate videos later. Inline deduplication will save us a lot
of resources that can be used to encode, transfer and store the duplicate copy of the video. As soon
as any user starts uploading a video, our service can run video matching algorithms (e.g., Block
Matching, Phase Correlation, etc.) to find duplications. If we already have a copy of the video being
uploaded, we can either stop the upload and use the existing copy or use the newly uploaded video
if it is of higher quality. If the newly uploaded video is a subpart of an existing video or vice versa,
we can intelligently divide the video into smaller chunks, so that we only upload those parts that are
missing.
10. Load Balancing
We should use Consistent Hashing among our cache servers, which will also help in balancing the
load between cache servers. Since we will be using a static hash-based scheme to map videos to
hostnames, it can lead to uneven load on the logical replicas due to the different popularity for each
video. For instance, if a video becomes popular, the logical replica corresponding to that video will
experience more traffic than other servers. These uneven loads for logical replicas can then translate
into uneven load distribution on corresponding physical servers. To resolve this issue, any busy
server in one location can redirect a client to a less busy server in the same cache location. We can
use dynamic HTTP redirections for this scenario.
However, the use of redirections also has its drawbacks. First, since our service tries to load balance
locally, it leads to multiple redirections if the host that receives the redirection can’t serve the video.
Also, each redirection requires a client to make an additional HTTP request; it also leads to higher
delays before the video starts playing back. Moreover, inter-tier (or cross data-center) redirections
lead a client to a distant cache location because the higher tier caches are only present at a small
number of locations.
11. Cache
To serve globally distributed users, our service needs a massive-scale video delivery system. Our
service should push its content closer to the user using a large number of geographically distributed
video cache servers. We need to have a strategy that would maximize user performance and also
evenly distributes the load on its cache servers.
We can introduce a cache for metadata servers to cache hot database rows. Using Memcache to
cache the data and Application servers before hitting database can quickly check if the cache has the
desired rows. Least Recently Used (LRU) can be a reasonable cache eviction policy for our system.
Under this policy, we discard the least recently viewed row first.
How can we build more intelligent cache? If we go with 80-20 rule, i.e., 20% of daily read
volume for videos is generating 80% of traffic, meaning that certain videos are so popular that the
majority of people view them; It follows that we can try caching 20% of daily read volume of
videos and metadata.
• CDNs replicate content in multiple places. There’s a better chance of videos being closer to
the user and with fewer hops, videos will stream from a friendlier network.
• CDN machines make heavy use of caching and can mostly serve videos out of memory.
Less popular videos (1-20 views per day) that are not cached by CDNs can be served by our servers
in various data centers.
‘CAPTION’ 500 times, we can store this number with the last character of the phrase. So now if the
user has typed ‘CAP’ we know the top most searched word under the prefix ‘CAP’ is ‘CAPTION’.
So given a prefix, we can traverse the sub-tree under it, to find the top suggestions.
Given a prefix, how much time it can take to traverse its sub-tree? Given the amount of data we
need to index, we should expect a huge tree. Even, traversing a sub-tree would take really long, e.g.,
the phrase ‘system design interview questions’ is 30 levels deep. Since we’ve very tight latency
requirements, we do need to improve the efficiency of our solution.
Can we store top suggestions with each node? This can surely speed up our searches but will
require a lot of extra storage. We can store top 10 suggestions at each node that we can return to the
user. We’ve to bear the big increase in our storage capacity to achieve the required efficiency.
We can optimize our storage by storing only references of the terminal nodes rather than storing the
entire phrase. To find the suggested term we’ve to traverse back using the parent reference from the
terminal node. We will also need to store the frequency with each reference to keep track of top
suggestions.
How would we build this trie? We can efficiently build our trie bottom up. Each parent node will
recursively call all the child nodes to calculate their top suggestions and their counts. Parent nodes
will combine top suggestions from all of their children to determine their top suggestions.
How to update the trie? Assumeing five billion searches every day, which would give us
approximately 60K queries per second. If we try to update our trie for every query it’ll be extremely
resource intensive and this can hamper our read requests too. One solution to handle this could be to
update our trie offline after a certain interval.
As the new queries come in, we can log them and also track their frequencies. Either we can log
every query or do sampling and log every 1000th query. For example, if we don’t want to show a
term which is searched for less than 1000 times, it’s safe to log every 1000th searched term.
We can have a Map-Reduce (MR) setup to process all the logging data periodically, say every hour.
These MR jobs will calculate frequencies of all searched terms in the past hour. We can then update
our trie with this new data. We can take the current snapshot of the trie and update it with all the
new terms and their frequencies. We should do this offline, as we don’t want our read queries to be
blocked by update trie requests. We can have two options:
1. We can make a copy of the trie on each server to update it offline. Once done we can switch
to start using it and discard the old one.
2. Another option is we can have a master-slave configuration for each trie server. We can
update slave while the master is serving traffic. Once the update is complete, we can make
the slave our new master. We can later update our old master, which can then start serving
traffic too.
How can we update the frequencies of typeahead suggestions? Since we are storing frequencies
of our typeahead suggestions with each node, we need to update them too. We can update only
difference in frequencies rather than recounting all search terms from scratch. If we’re keeping
count of all the terms searched in last 10 days, we’ll need to subtract the counts from the time
period no longer included and add the counts for the new time period being included. We can add
and subtract frequencies based on Exponential Moving Average (EMA) of each term. In EMA, we
give more weight to the latest data. It’s also known as the exponentially weighted moving average.
After inserting a new term in the trie, we’ll go to the terminal node of the phrase and increase its
frequency. Since we’re storing the top 10 queries in each node, it is possible that this particular
search term jumped into the top 10 queries of a few other nodes. So, we need to update the top 10
queries of those nodes then. We’ve to traverse back from the node to all the way up to the root. For
every parent, we check if the current query is part of the top 10. If so, we update the corresponding
frequency. If not, we check if the current query’s frequency is high enough to be a part of the top
10. If so, we insert this new term and remove the term with the lowest frequency.
How can we remove a term from the trie? Let’s say we’ve to remove a term from the trie,
because of some legal issue or hate or piracy etc. We can completely remove such terms from the
trie when the regular update happens, meanwhile, we can add a filtering layer on each server, which
will remove any such term before sending them to users.
What could be different ranking criteria for suggestions? In addition to a simple count, for
terms ranking, we have to consider other factors too, e.g., freshness, user location, language,
demographics, personal history etc.
If we store this trie in a file with the above-mentioned scheme, we will have:
“C2,A2,R1,T,P,O1,D”. From this, we can easily rebuild our trie.
If you’ve noticed we are not storing top suggestions and their counts with each node, it is hard to
store this information, as our trie is being stored top down, we don’t have child nodes created before
the parent, so there is no easy way to store their references. For this, we have to recalculate all the
top terms with counts. This can be done while we are building the trie. Each node will calculate its
top suggestions and pass it to its parent. Each parent node will merge results from all of its children
to figure out its top suggestions.
5. Scale Estimation
If we are building a service, which has the same scale as that of Google, we can expect 5 billion
searches every day, which would give us approximately 60K queries per second.
Since there will be a lot of duplicates in 5 billion queries, we can assume that only 20% of these
will be unique. If we only want to index top 50% of the search terms, we can get rid of a lot of less
frequently searched queries. Let’s assume we will have 100 million unique terms for which we want
to build an index.
Storage Estimation: If on the average each query consists of 3 words, and if the average length of
a word is 5 characters, this will give us 15 characters of average query size. Assuming we need 2
bytes to store a character, we will need 30 bytes to store an average query. So total storage we will
need:
100 million * 30 bytes => 3 GB
We can expect some growth in this data every day, but we should also be removing some terms that
are not searched anymore. If we assume we have 2% new queries every day and if we are
maintaining our index for last one year, total storage we should expect:
3GB + (0.02 * 3 GB * 365 days) => 25 GB
6. Data Partition
Although our index can easily fit on one server, we’ll partition it meet our higher efficiency and
lower latencies requirements. How can we efficiently partition our data to distribute it onto multiple
servers?
a. Range Based Partitioning: What if we store our phrases in separate partitions based on their
first letter. So we save all the terms starting with letter ‘A’ in one partition and those that start with
letter ‘B’ into another partition and so on. We can even combine certain less frequently occurring
letters into one database partition. We should come up with this partitioning scheme statically so
that we can always store and search terms in a predictable manner.
The main problem with this approach is that it can lead to unbalanced servers, for instance; if we
decide to put all terms starting with letter ‘E’ into a DB partition, but later we realize that we have
too many terms that start with letter ‘E’, which we can’t fit into one DB partition.
We can see that the above problem will happen with every statically defined scheme. It is not
possible to calculate if each of our partitions will fit on one server statically.
b. Partition based on the maximum capacity of the server: Let’s say we partition our trie based
on the maximum memory capacity of the servers. We can keep storing data on a server as long as it
has memory available. Whenever a sub-tree cannot fit into a server, we break our partition there to
assign that range to this server and move on the next server to repeat this process. Let’s say, if our
first trie server can store all terms from ‘A’ to ‘AABC’, which mean our next server will store from
‘AABD’ onwards. If our second server could store up to ‘BXA’, next serve will start from ‘BXB’
and so on. We can keep a hash table to quickly access this partitioning scheme:
Server 1, A-AABC
Server 2, AABD-BXA
Server 3, BXB-CDA
For querying, if the user has typed ‘A’ we have to query both server 1 and 2 to find the top
suggestions. When the user has typed ‘AA’, still we have to query server 1 and 2, but when the user
has typed ‘AAA’ we only need to query server 1.
We can have a load balancer in front of our trie servers, which can store this mapping and redirect
traffic. Also if we are querying from multiple servers, either we need to merge the results at the
server side to calculate overall top results, or make our clients do that. If we prefer to do this on the
server side, we need to introduce another layer of servers between load balancers and trie servers,
let’s call them aggregator. These servers will aggregate results from multiple trie servers and return
the top results to the client.
Partitioning based on the maximum capacity can still lead us to hotspots e.g., if there are a lot of
queries for terms starting with ‘cap’, the server holding it will have a high load compared to others.
c. Partition based on the hash of the term: Each term will be passed to a hash function, which
will generate a server number and we will store the term on that server. This will make our term
distribution random and hence minimizing hotspots. To find typeahead suggestions for a term, we
have to ask all servers and then aggregate the results. We have to use consistent hashing for fault
tolerance and load distribution.
7. Cache
We should realize that caching the top searched terms will be extremely helpful in our service.
There will be a small percentage of queries that will be responsible for most of the traffic. We can
have separate cache servers in front of the trie servers, holding most frequently searched terms and
their typeahead suggestions. Application servers should check these cache servers before hitting the
trie servers to see if they have the desired searched terms.
We can also build a simple Machine Learning (ML) model that can try to predict the engagement
on each suggestion based on simple counting, personalization, or trending data etc., and cache these
terms.
9. Fault Tolerance
What will happen when a trie server goes down? As discussed above we can have a master-slave
configuration, if the master dies slave can take over after failover. Any server that comes back up,
can rebuild the trie based on the last snapshot.
11. Personalization
Users will receive some typeahead suggestions based on their historical searches, location,
language, etc. We can store the personal history of each user separately on the server and cache
them on the client too. The server can add these personalized terms in the final set, before sending it
to the user. Personalized searches should always come before others.
Designing an API Rate Limiter
Let's design an API Rate Limiter which will throttle users based upon the number of the requests
they are sending. Difficulty Level: Medium
Rolling Window Algorithm: In this algorithm, the time window is considered from the fraction of
the time at which the request is made plus the time window length. For example, if there are two
messages sent at 300th millisecond and 400th millisecond of a second, we’ll count them as two
messages from 300th millisecond of that second up to the 300th millisecond of next second. In the
above diagram, keeping two messages a second, we’ll throttle ‘m3’ and ‘m4’.
1. If the ‘UserID’ is not present in the hash-table, insert it and set the ‘Count’ to 1 and
‘StarteTime’ to the current time, and allow the request.
2. Otherwise, find the record of the ‘UserID’ and if ‘CurrentTime – StartTime >= 1 min’, set
the ‘StartTime’ to the current time and ‘Count’ to 1, and allow the request.
• If ‘Count < 3’, increment the Count and allow the request.
• If ‘Count >= 3’, reject the request.
What are the some problems with our algorithm?
1. This is a Fixed Window algorithm, as we’re resetting the ‘StartTime’ at the end of every
minute, which means it can potentially allow twice the number of requests per minute.
Imagine if Kristie sends three requests at the last second of a minute, then she can
immediately send three more requests at the very first second of the next minute, resulting in
6 requests in the span of two seconds. The solution to this problem would be a sliding
window algorithm which we’ll discuss later.
2. Atomicity: In a distributed environment, the “read-and-then-write” behavior can create a
race condition. Imagine if Kristie’s current ‘Count’ is “2” and that she issues two more
requests. If two separate processes served each of these requests and concurrently read the
Count before either of them updated it, each process would think the Kristie can have one
more request and that she had not hit the rate limit.
If we are using Redis to store our key-value, one solution to resolve the atomicity problem is to use
Redis lock for the duration of the read-update operation. This, however, would come at the expense
of slowing down concurrent requests from the same user and introducing another layer of
complexity. We can use Memcached, but it would have comparable complications.
If we are using a simple hash-table, we can have a custom implementation for ‘locking’ each record
to solve our atomicity problems.
How much memory would we need to store all of the user data? Let’s assume the simple
solution where we are keeping all of the data in a hash-table.
Let’s assume ‘UserID’ takes 8 bytes. Let’s also assume a 2 byte ‘Count’, which can count up to
65k, is sufficient for our use case. Although epoch time will need 4 bytes, we can choose to store
only the minute and second part, which can fit into 2 bytes. Hence, we need total 12 bytes to store a
user’s data:
8 + 2 + 2 = 12 bytes
Let’s assume our hash-table has an overhead of 20 bytes for each record. If we need to track one
million users at any time, the total memory we would need would be 32MB:
(12 + 20) bytes * 1 million => 32MB
If we assume that we would need a 4-byte number to lock each user’s record to resolve our
atomicity problems, we would require a total 36MB memory.
This can easily fit on a single server, however we would not like to route all of our traffic through a
single machine. Also, if we assume a rate limit of 10 requests per second, this would translate into
10 million QPS for our rate limiter! This would be too much for a single server. Practically we can
assume we would use Redis or Memcached kind of a solution in a distributed setup. We’ll be
storing all the data in the remote Redis servers, and all the Rate Limiter servers will read (and
update) these servers before serving or throttling any request.
Let’s assume our rate limiter is allowing three requests per minute per user, so whenever a new
request comes in the Rate Limiter will perform following steps:
1. Remove all the timestamps from the Sorted Set that are older than “CurrentTime - 1
minute”.
2. Count the total number of elements in the sorted set. Reject the request if this count is
greater than our throttling limit of “3”.
3. Insert the current time in the sorted set, and accept the request.
How much memory would we need to store all of the user data for sliding window? Let’s
assume ‘UserID’ takes 8 bytes. Each epoch time will require 4 bytes. Let’s suppose we need a rate
limiting of 500 requests per hour. Let’s assume 20 bytes overhead for hash-table and 20 bytes
overhead for the Sorted Set. At max, we would need a total of 12KB to store one user’s data:
8 + (4 + 20 (sorted set overhead)) * 500 + 20 (hash-table overhead) = 12KB
If we need to track one million users at any time, total memory we would need would be 12GB:
12KB * 1 million ~= 12GB
Sliding Window Algorithm is taking a lot of memory compared to the Fixed Window; this would
be a scalability issue. What if we can combine the above two algorithms to optimize our memory
usage?
4. System APIs
We can have SOAP or REST APIs to expose functionality of our service; following could be the
definition of search API:
search(api_dev_key, search_terms, maximum_results_to_return, sort, page_token)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
search_terms (string) : A string containing the search terms.
maximum_results_to_return (number): Number of status messages to return.
sort (number) : Optional sort mode: Latest first (0 - default), Best matched (1), Most liked (2).
page_token (string) : This token will specify a page in the result set that should be returned.
Returns (JSON) :
A JSON containing information about a list of status messages matching the search query. Each
result entry can have the user ID & name, status text, status ID, creation time, number of likes, etc.
7. Fault Tolerance
What will happen when an index server dies? We can have a secondary replica of each server, and
if the primary server dies it can take control after the failover. Both primary and secondary servers
will have the same copy of the index.
What if both primary and secondary servers die at the same time? We have to allocate a new server
and rebuild the same index on it. How can we do that? We don’t know what words/statuses were
kept on this server. If we were using ‘Sharding based on the status object’, the brute-force solution
would be to iterate through the whole database and filter StatusIDs using our hash function to figure
out all the required Statuses that will be stored on this server. This would be inefficient and also
during the time when the server is being rebuilt we will not be able to serve any query from it, thus
missing some Statuses that should have been seen by the user.
How can we efficiently retrieve a mapping between Statuses and index server? We have to build a
reverse index that will map all the StatusID to their index server. Our Index-Builder server can hold
this information. We will need to build a Hashtable, where the ‘key’ would be the index server
number and the ‘value’ would be a HashSet containing all the StatusIDs being kept at that index
server. Notice that we are keeping all the StatusIDs in a HashSet, this will enable us to add/remove
Statuses from our index quickly. So now whenever an index server has to rebuild itself, it can
simply ask the Index-Builder server for all the Statuses it needs to store, and then fetch those
statuses to build the index. This approach will surely be quite fast. We should also have a replica of
Index-Builder server for fault tolerance.
8. Cache
To deal with hot status objects, we can introduce a cache in front of our database. We can use
Memcache , which can store all such hot status objects in memory. Application servers before
hitting backend database can quickly check if the cache has that status object. Based on clients’
usage pattern we can adjust how many cache servers we need. For cache eviction policy, Least
Recently Used (LRU) seems suitable for our system.
9. Load Balancing
We can add Load balancing layer at two places in our system 1) Between Clients and Application
servers and 2) Between Application servers and Backend server. Initially, a simple Round Robin
approach can be adopted; that distributes incoming requests equally among backend servers. This
LB is simple to implement and does not introduce any overhead. Another benefit of this approach is
if a server is dead, LB will take it out of the rotation and will stop sending any traffic to it. A
problem with Round Robin LB is, it won’t take server load into consideration. If a server is
overloaded or slow, the LB will not stop sending new requests to that server. To handle this, a more
intelligent LB solution can be placed that periodically queries backend server about their load and
adjusts traffic based on that.
10. Ranking
How about if we want to rank the search results by social graph distance, popularity, relevance, etc?
Let’s assume we want to rank statuses on popularity, like, how many likes or comments a status is
getting, etc. In such a case our ranking algorithm can calculate a ‘popularity number’ (based on the
number of likes etc.), and store it with the index. Each partition can sort the results based on this
popularity number before returning results to the aggregator server. The aggregator server combines
all these results, sort them based on the popularity number and sends the top results to the user.
Designing a Web Crawler
Let's design a Web Crawler that will systematically browse and download the World Wide Web.
Web crawlers are also known as web spiders, robots, worms, walkers, and bots. Difficulty Level:
Hard
• To test web pages and links for valid syntax and structure.
• To monitor sites to see when their structure or contents change.
• To maintain mirror sites for popular Web sites.
• To search for copyright infringements.
• To build a special-purpose index, e.g., one that has some understanding of the content stored
in multimedia files on the Web.
How to crawl?
Breadth first or depth first? Breadth-first search (BFS) is usually used. However, Depth First
Search (DFS) is also utilized in some situations, such as if your crawler has already established a
connection with the website, it might just DFS all the URLs within this website to save some
handshaking overhead.
Path-ascending crawling: Path-ascending crawling can help discover a lot of isolated resources or
resources for which no inbound link would have been found in regular crawling of a particular Web
site. In this scheme, a crawler would ascend to every path in each URL that it intends to crawl. For
example, when given a seed URL of [Link] it will attempt to crawl /a/b/, /a/,
and /.
7. Fault tolerance
We should use consistent hashing for distribution among crawling servers. Extended hashing will
not only help in replacing a dead host but also help in distributing load among crawling servers.
All our crawling servers will be performing regular checkpointing and storing their FIFO queues to
disks. If a server goes down, we can replace it. Meanwhile, extended hashing should shift the load
to other servers.
8. Data Partitioning
Our crawler will be dealing with three kinds of data: 1) URLs to visit 2) URL checksums for
dedupe 3) Document checksums for dedupe.
Since we are distributing URLs based on the hostnames, we can store these data on the same host.
So, each host will store its set of URLs that need to be visited, checksums of all the previously
visited URLs and checksums of all the downloaded documents. Since we will be using extended
hashing, we can assume that URLs will be redistributed from overloaded hosts.
Each host will perform checkpointing periodically and dump a snapshot of all the data it is holding
into a remote server. This will ensure that if a server dies down, another server can replace it by
taking its data from the last snapshot.
9. Crawler Traps
There are many crawler traps, spam sites, and cloaked content. A crawler trap is a URL or set of
URLs that cause a crawler to crawl indefinitely. Some crawler traps are unintentional. For example,
a symbolic link within a file system can create a cycle. Other crawler traps are introduced
intentionally. For example, people have written traps that dynamically generate an infinite Web of
documents. The motivations behind such traps vary. Anti-spam traps are designed to catch crawlers
used by spammers looking for email addresses, while other sites use traps to catch search engine
crawlers to boost their search ratings.
AOPIC algorithm (Adaptive Online Page Importance Computation), can help mitigating common
types of bot-traps. AOPIC solves this problem by using a credit system.
1. Start with a set of N seed pages.
2. Before crawling starts, allocate a fixed X amount of credit to each page.
3. Select a page P with the highest amount of credit (or select a random page if all pages have
the same amount of credit).
4. Crawl page P (let’s say that P had 100 credits when it was crawled).
5. Extract all the links from page P (let’s say there are 10 of them).
6. Set the credits of P to 0.
7. Take a 10% “tax” and allocate it to a Lambda page.
8. Allocate an equal amount of credits each link found on page P from P’s original credit after
subtracting the tax, so: (100 (P credits) - 10 (10% tax))/10 (links) = 9 credits per each link.
9. Repeat from step 3.
Since the Lambda page continuously collects the tax, eventually it will be the page with the largest
amount of credit, and we’ll have to “crawl” it. By crawling the Lambda page, we just take its credits
and distribute them equally to all the pages in our database.
Since bot traps only give internal links credits and they rarely get credit from the outside, they will
continually leak credits (from taxation) to the Lambda page. The Lambda page will distribute that
credits out to all the pages in the database evenly, and upon each cycle, the bot trap page will lose
more and more credits until it has so little credits that it almost never gets crawled again. This will
not happen with good pages because they often get credits from backlinks found on other pages.
Designing Facebook’s Newsfeed
Let's design Facebook's Newsfeed, which would contain posts, photos, videos and status updates
from all the people and pages a user follows.
Similar Services: Twitter Newsfeed, Instagram Newsfeed, Quora Newsfeed Difficulty Level: Hard
4. System APIs
💡 Once we’ve finalized the requirements, it’s always a good idea to define the system APIs. This
would explicitly state what is expected from the system.
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definition of the API for getting the newsfeed:
getUserFeed(api_dev_key, user_id, since_id, count, max_id, exclude_replies)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This can be used to, among
other things, throttle users based on their allocated quota.
user_id (number) : The ID of the user for whom the system will generate the newsfeed.
since_id (number) : Optional; returns results with an ID greater than (that is, more recent than) the
specified ID. count (number): Optional; specifies the number of feed items to try and retrieve, up to
a maximum of 200 per distinct request.
max_id (number) : Optional; returns results with an ID less than (that is, older than) or equal to the
specified ID.
exclude_replies (boolean) : Optional; this parameter will prevent replies from appearing in the
returned timeline.
Returns: (JSON) Returns a JSON object containing a list of feed items.
5. Database Design
There are three basic objects: User, Entity (e.g., page, group, etc.) and FeedItem (or Post). Here are
some observations about the relationships between these entities:
• A User can follow entities and can become friends with other users.
• Both users and entities can post FeedItems which can contain text, images or videos.
• Each FeedItem will have a UserID which would point to the User who created it. For
simplicity, let’s assume that only users can create feed items, although on Facebook, Pages
can post feed item too.
• Each FeedItem can optionally have an EntityID pointing to the page or the group where that
post was created.
If we are using a relational database, we would need to model two relations: User-Entity relation
and FeedItem-Media relation. Since each user can be friends with many people and follow a lot of
entities, we can store this relation in a separate table. The “Type” column in “UserFollow”
identifies if the entity being followed is a User or Entity. Similarly, we can have a table for
FeedMedia relation.
6. High Level System Design
At a high level this problem can be divided into two parts:
Feed generation: Newsfeed is generated from the posts (or feed items) of users and entities (pages
and groups) that a user follows. So, whenever our system receives a request to generate the feed for
a user (say Jane), we will perform following steps:
1. Retrieve IDs of all users and entities that Jane follows.
2. Retrieve latest, most popular and relevant posts for those IDs. These are the potential posts
that we can show in Jane’s newsfeed.
3. Rank these posts, based on the relevance to Jane. This represents Jane’s current feed.
4. Store this feed in the cache and return top posts (say 20) to be rendered on Jane’s feed.
5. On the front-end when Jane reaches the end of her current feed, she can fetch next 20 posts
from the server and so on.
One thing to notice here is that we generated the feed once and stored it in cache. What about new
incoming posts from people that Jane follows? If Jane is online, we should have a mechanism to
rank and add those new posts to her feed. We can periodically (say every five minutes) perform the
above steps to rank and add the newer posts to her feed. Jane can then be notified that there are
newer items in her feed that she can fetch.
Feed publishing: Whenever Jane loads her newsfeed page, she has to request and pull feed items
from the server. When she reaches the end of her current feed, she can pull more data from the
server. For newer items either the server can notify Jane and then she can pull, or the server can
push these new posts. We will discuss these options in detail later.
At a high level, we would need following components in our Newsfeed service:
1. Web servers: To maintain a connection with the user. This connection will be used to
transfer data between the user and the server.
2. Application server: To execute the workflows of storing new posts in the database servers.
We would also need some application servers to retrieve and push the newsfeed to the end
user.
3. Metadata database and cache: To store the metadata about Users, Pages and Groups.
4. Posts database and cache: To store metadata bot posts and their contents.
5. Video and photo storage, and cache: Blob storage, to store all the media included in the
posts.
6. Newsfeed generation service: To gather and rank all the relevant posts for a user to
generate newsfeed and store in the cache. This service would also receive live updates and
will add these newer feed items to any user’s timeline.
7. Feed notification service: To notify the user that there are newer items available for their
newsfeed.
Following is the high-level architecture diagram of our system. User B and C are following User A.
Here are issues with this design for the feed generation service:
1. Crazy slow for users with a lot of friends/follows as we have to perform
sorting/merging/ranking of a huge number of posts.
2. We generate the timeline when a user loads their page. This would be quite slow and have a
high latency.
3. For live updates, each status update will result in feed updates for all followers. This could
result in high backlogs in our Newsfeed Generation Service.
4. For live updates, the server pushing (or notifying about) newer posts to users could lead to
very heavy loads, especially for people or pages that have a lot of followers. To improve the
efficiency, we can pre-generate the timeline and store it in a memory.
Offline generation for newsfeed : We can have dedicated servers that are continuously generating
users’ newsfeed and storing them in memory. So, whenever a user requests for the new posts for
their feed, we can simply serve it from the pre-generated, stored location. Using this scheme user’s
newsfeed is not compiled on load, but rather on a regular basis and returned to users whenever they
request for it.
Whenever these servers need to generate the feed for a user, they would first query to see what was
the last time the feed was generated for that user. Then, new feed data would be generated from that
time onwards. We can store this data in a hash table, where the “key” would be UserID and “value”
would be a STRUCT like this:
Struct {
LinkedHashMap<FeedItemID> feedItems;
DateTime lastGenerated;
}
We can store FeedItemIDs in a Linked HashMap, which will enable us to not only jump to any feed
item but also iterate through the map easily. Whenever users want to fetch more feed items, they
can send the last FeedItemID they currently see in their newsfeed, we can then jump to that
FeedItemID in our linked hash map and return next batch / page of feed items from there.
How many feed items should we store in memory for a user’s feed? Initially, we can decide to
store 500 feed items per user, but this number can be adjusted later based on the usage pattern. For
example, if we assume that one page of user’s feed has 20 posts and most of the users never browse
more than ten pages of their feed, we can decide to store only 200 posts per user. For any user, who
wants to see more posts (more than what is stored in memory) we can always query backend
servers.
Should we generate (and keep in memory) newsfeed for all users? There will be a lot of users
that don’t login frequently. Here are a few things we can do to handle this. A simpler approach
could be to use an LRU based cache that can remove users from memory that haven’t accessed their
newsfeed for a long time. A smarter solution can figure out the login pattern of users to pre-generate
their newsfeed, e.g., At what time of the day a user is active? Which days of the week a user
accesses their newsfeed? etc.
Let’s now discuss some solutions to our “live updates” problems in the following section.
b. Feed publishing
The process of pushing a post to all the followers is called a fanout. By analogy, the push approach
is called fanout-on-write, while the pull approach is called fanout-on-load. Let’s discuss different
options of publishing feed data to users.
1. “Pull” model or Fan-out-on-load: This method involves keeping all the recent feed data in
memory so that users can pull it from the server whenever they need it. Clients can pull the
feed data on a regular basis or manually whenever they need it. Possible problems with this
approach are a) New data might not be shown to the users until they issue a pull request, b)
It’s hard to find the right pull cadence, as most of the time pull requests will result in an
empty response if there is no new data, causing waste of resources.
2. “Push” model or Fan-out-on-write: For a push system, once a user has published a post,
we can immediately push this post to all her followers. The advantage is that when fetching
feed, you don’t need to go through your friends list and get feeds for each of them. It
significantly reduces read operations. To efficiently manage this, users have to maintain a
Long Poll request with the server for receiving the updates. A possible problem with this
approach is that when a user has millions of followers (or a celebrity-user), the server has to
push updates to a lot of people.
3. Hybrid :Another interesting approach to handle feed data could be to use a hybrid approach,
i.e., to do a combination of fan-out-on-write and fan-out-on-load. Specifically, we can stop
pushing posts from users with a high number of followers (a celebrity user) and only push
data for those users who have a few hundred (or thousand) followers. For celebrity users, we
can let the followers pull the updates. Since the push operation can be extremely costly for
users who have a lot of friends or followers therefore, by disabling fanout for them, we can
save a huge number of resources. Another alternate approach could be that once a user
publishes a post; we can limit the fanout to only her online friends. Also, to get benefits of
both the approaches, a combination of push to notify and pull for serving end users is a great
way to go. Purely push or pull model is less versatile.
How many feed items can we return to the client in each request? We should have a maximum
limit for the number of items a user can fetch in one request (say 20). But we should let clients
choose to specify how many feed items they want with each request, as the user may like to fetch a
different number of posts depending on the device (mobile vs desktop).
Should we always notify users if there are new posts available for their newsfeed? It could be
useful for users to get notified whenever new data is available. However, on mobile devices, where
data usage is relatively expensive, it can consume unnecessary bandwidth. Hence, at least for
mobile devices, we can choose not to push data, instead let users “Pull to Refresh” to get new posts.
8. Feed Ranking
The most straightforward way to rank posts in a newsfeed is by the creation time of the posts. But
today’s ranking algorithms are doing a lot more than that to ensure “important” posts are ranked
higher. The high-level idea of ranking is to first select key “signals” that make a post important and
then figure out how to combine them to calculate a final ranking score.
More specifically, we can select features that are relevant to the importance of any feed item, e.g.
number of likes, comments, shares, time of the update, whether the post has images/videos, etc., and
then, a score can be calculated using these features. This is generally enough for a simple ranking
system. A better ranking system can significantly improve itself by constantly evaluating if we are
making progress in user stickiness, retention, ads revenue, etc.
9. Data Partitioning
a. Sharding posts and metadata
Since we have a huge number of new posts every day and our read load is extremely high too, we
need to distribute our data onto multiple machines such that we can read/write it efficiently. For
sharding our databases that are storing posts and their metadata, we can have a similar design as
discussed under Designing Twitter.
b. Sharding feed data
For feed data, which is being stored in memory, we can partition it based on UserID. We can try
storing all the data of a user on one server. when storing, we can pass the UserID to our hash
function that will map the user to a cache server where we will store the user’s feed objects. Also,
for any given user, since we don’t expect to store more than 500 FeedItmeIDs, we wouldn’t run into
a scenario where feed data for a user doesn’t fit on a single server. To get the feed of a user, we
would always have to query only one server. For future growth and replication, we must use
Consistent Hashing.
Designing Yelp
Let's design a Yelp like service, where users can search for nearby places like restaurants, theaters
or shopping malls, etc., and can also add/view reviews of places. Similar Services: Proximity
server. Difficulty Level: Hard
3. Scale Estimation
Let’s build our system assuming that we have 500M places and 100K queries per second (QPS).
Let’s also assume a 20% growth in the number of places and QPS each year.
4. Database Schema
Each location can have following fields:
1. LocationID (8 bytes): Uniquely identifies a location.
2. Name (256 bytes)
3. Latitude (8 bytes)
4. Longitude (8 bytes)
5. Description (512 bytes)
6. Category (1 byte): E.g., coffee shop, restaurant, theater, etc.
Although a four bytes number can uniquely identify 500M locations, with future growth in mind,
we will go with 8 bytes for LocationID.
Total size: 8 + 256 + 8 + 8 + 512 + 1 => 793 bytes
We also need to store reviews, photos, and ratings of a Place. We can have a separate table to store
reviews for Places:
1. LocationID (8 bytes)
2. ReviewID (4 bytes): Uniquely identifies a review, assuming any location will not have more
than 2^32 reviews.
3. ReviewText (512 bytes)
4. Rating (1 byte): how many stars a place gets out of ten.
Similarly, we can have a separate table to store photos for Places and Reviews.
5. System APIs
We can have SOAP or REST APIs to expose the functionality of our service. Following could be
the definition of the API for searching:
search(api_dev_key, search_terms, user_location, radius_filter,
maximum_results_to_return, category_filter, sort, page_token)
Parameters:
api_dev_key (string) : The API developer key of a registered account. This will be used to, among
other things, throttle users based on their allocated quota.
search_terms (string) : A string containing the search terms.
user_location (string) : Location of the user performing the search.
radius_filter (number) : Optional search radius in meters.
maximum_results_to_return (number): Number of business results to return.
category_filter (string) : Optional category to filter search results with, e.g., Restaurants, Shopping
Centers. etc.
sort (number) : Optional sort mode: Best matched (0 - default), Minimum distance (1), Highest
rated (2).
page_token (string) : This token will specify a page in the result set that should be returned.
Returns: (JSON)
A JSON containing information about a list of businesses matching the search query. Each result
entry will have the business name, address, category, rating, and thumbnail.
a. SQL solution
One simple solution could be to store all the data in a database like MySQL. Each place will be
stored in a separate row, uniquely identified by LocationID. Each place will have its longitude and
latitude stored separately in two different columns, and to perform a fast search; we should have
indexes on both these fields.
To find all the nearby places of a given location (X, Y) within a radius ‘D’, we can query like this:
Select * from Places where Latitude between X-D and X+D and Longitude between Y-D and Y+D
How efficient this query would be? We have estimated 500M places to be stored in our service.
Since we have two separate indexes, each index can return a huge list of places, and performing an
intersection on those two lists won’t be efficient. Another way to look at this problem is that there
could be too many locations between ‘X-D’ and ‘X+D’, and similarly between ‘Y-D’ and ‘Y+D’. If
we can somehow shorten these lists, it can improve the performance of our query.
b. Grids
We can divide the whole map into smaller grids to group locations into smaller sets. Each grid will
store all the Places residing within a certain range of longitude and latitude. This scheme would
enable us to query only a few grids to find nearby places. Based on given location and radius, we
can find all the nearby grids and then only query these grids to find nearby places.
Let’s
assume
that
GridID
(a four
bytes
number)
would
uniquel
y
identify
grids in
our system.
What could be a reasonable grid size? Grid size could be equal to the distance we would like to
query since we also want to reduce the number of grids. If the grid size is equal to the distance we
want to query, then we only need to search within the grid which contains the given location and
neighboring eight grids. Since our grids would be statically defined (from the fixed grid size), we
can easily find the grid number of any location (lat, long) and its neighboring grids.
In the database, we can store the GridID with each location and have an index on it too for faster
searching. Now, our query will look like:
Select * from Places where Latitude between X-D and X+D and Longitude between Y-D and Y+D
and GridID in (GridID, GridID1, GridID2, ..., GridID8)
This will undoubtedly improve the runtime of our query.
Should we keep our index in memory? Maintaining the index in memory will improve the
performance of our service. We can keep our index in a hash table, where ‘key’ would be the grid
number and ‘value’ would be the list of places contained in that grid.
How much memory will we need to store the index? Let’s assume our search radius is 10 miles,
given that total area of the earth is around 200 million square miles; we will have 20 million grids.
We would need a four bytes number to uniquely identify each grid, and since LocationID is 8 bytes,
therefore we would need 4GB of memory (ignoring hash table overhead) to store the index.
(4 * 20M) + (8 * 500M) ~= 4 GB
This solution can still run slow for those grids that have a lot of places since our places are not
uniformly distributed among grids. We can have a thickly dense area with a lot of places, and on the
other hand, we can have areas which are sparsely populated.
This problem can be solved if we can dynamically adjust our grid size, such that whenever we have
a grid with a lot of places we break it down to create smaller grids. One challenge with this
approach could be, how would we map these grids to locations? Also, how can we find all the
neighboring grids of a grid?
7. Data Partitioning
What if we have a huge number of places such that, our index does not fit into a single machine’s
memory? With 20% growth, each year, we will reach the memory limit of the server in the future.
Also, what if one server cannot serve the desired read traffic? To resolve these issues, we must
partition our QuadTree!
We will explore two solutions here (both of these partitioning schemes can be applied to databases
too):
a. Sharding based on regions: We can divide our places into regions (like zip codes), such that all
places belonging to a region will be stored on a fixed node. While storing, we will find the region of
each place to find the server and store the place there. Similarly, while querying for nearby places,
we can ask the region server that contains user’s location. This approach has a couple of issues:
1. What if a region becomes hot? There would be a lot of queries on the server holding that
region, making it perform slow. This will affect the performance of our service.
2. Over time some regions can end up storing a lot of places compared to others. Hence
maintaining a uniform distribution of places, while regions are growing, is quite difficult.
To recover from these situations either we have to repartition our data or use consistent hashing.
b. Sharding based on LocationID: Our hash function will map each LocationID to a server where
we will store that place. While building our QuadTree, we will iterate through all the places and
calculate the hash of each LocationID to find a server where it would be stored. To find nearby
places of a location we have to query all servers, and each server will return a set of nearby places.
A centralized server will aggregate these results to return them to the user.
Will we have different QuadTree structure on different partitions? Yes, this can happen, since
it is not guaranteed that we will have an equal number of places in any given grid on all partitions.
Though, we do make sure that all servers have approximately equal number of Places. This
different tree structure on different servers will not cause any issue though, as we will be searching
all the neighboring grids within the given radius on all partitions.
Remaining part of this chapter assumes that we have partitioned our data based on LocationID.
9. Cache
To deal with hot Places, we can introduce a cache in front of our database. We can use an off-the-
shelf solution like Memcache, which can store all data about hot places. Application servers before
hitting backend database can quickly check if the cache has that Place. Based on clients’ usage
pattern, we can adjust how many cache servers we need. For cache eviction policy, Least Recently
Used (LRU) seems suitable for our system.
11. Ranking
How about if we want to rank the search results not just by proximity but also by popularity or
relevance?
How can we return most popular places within a given radius? Let’s assume we keep track of
the overall popularity of each place. An aggregated number can represent this popularity in our
system, e.g., how many stars a place gets out of ten (this would be an average of different rankings
given by users)? We will store this number in the database, as well as, in the QuadTree. While
searching for top 100 places within a given radius, we can ask each partition of the QuadTree to
return top 100 places having maximum popularity. Then the aggregator server can determine top
100 places among all the places returned by different partitions.
Remember that we didn’t build our system to update place’s data frequently. With this design, how
can we modify popularity of a place in our QuadTree? Although we can search a place and update
its popularity in the QuadTree, it would take a lot of resources and can affect search requests and
system throughput. Assuming popularity of a place is not expected to reflect in the system within a
few hours, we can decide to update it once or twice a day, especially when the load on the system is
minimum.
Our next problem Designing Uber backend discusses dynamic updates of the QuadTree in detail.
Designing Uber backend
Let's design a ride-sharing service like Uber, which connects passengers who need a ride with
drivers who have a car. Similar Services: Lyft, Didi, Via, Sidecar etc. Difficulty level: Hard
Prerequisite: Designing Yelp
1. What is Uber?
Uber enables its customers to book drivers for taxi rides. Uber drivers use their personal cars to
drive customers around. Both customers and drivers communicate with each other through their
smartphones using Uber app.
• Drivers need to regularly notify the service about their current location and their availability
to pick passengers.
• Passengers get to see all the nearby available drivers.
• Customer can request a ride; nearby drivers are notified that a customer is ready to be picked
up.
• Once a driver and customer accept a ride, they can constantly see each other’s current
location, until the trip finishes.
• Upon reaching the destination, the driver marks the journey complete to become available
for the next ride.
• Since all active drivers are reporting their locations every three seconds, we need to update
our data structures to reflect that. If we have to update the QuadTree for every change in the
driver’s position, it will take a lot of time and resources. To update a driver to its new
location, we must find the right grid based on the driver’s previous location. If the new
position does not belong to the current grid, we have to remove the driver from the current
grid and move/reinsert the user to the correct grid. After this move, if the new grid reaches
the maximum limit of drivers, we have to repartition it.
• We need to have a quick mechanism to propagate current location of all the nearby drivers
to any active customer in that area. Also, when a ride is in progress, our system needs to
notify both the driver and passenger about the current location of the car.
Although our QuadTree helps us find nearby drivers quickly, a fast update in the tree is not
guaranteed.
Do we need to modify our QuadTree every time a driver reports their location? If we don’t
update our QuadTree with every update from the driver, it will have some old data and will not
reflect the current location of drivers correctly. If you recall, our purpose of building the QuadTree
was to find nearby drivers (or places) efficiently. Since all active drivers report their location every
three seconds, hence there will be lot more updates happening to our tree than querying for nearby
drivers. So, what if we keep the latest position reported by all drivers in a hash table and update our
QuadTree a little less frequent? Let’s assume we guarantee that a driver’s current location will be
reflected in the QuadTree within 15 seconds. Meanwhile, we will maintain a hash table that will
store the current location reported by drivers; let’s call this DriverLocationHT.
How much memory we need for DriverLocationHT? We need to store DriveID, their present
and old location in the hash table. So we need total 35 bytes to store one record:
1. DriverID (3 bytes - 1 million drivers)
2. Old latitude (8 bytes)
3. Old longitude (8 bytes)
4. New latitude (8 bytes)
5. New longitude (8 bytes) Total = 35 bytes
If we have 1 million total drivers, we need the following memory (ignoring hash table overhead):
1 million * 35 bytes => 35 MB
How much bandwidth our service will consume to receive location updates from all drivers?
If we get DriverID and their location, it will be (3+16 => 19 bytes). If we receive this information
every three seconds from one million drivers, we will be getting 19MB per three seconds.
Do we need to distribute DriverLocationHT onto multiple servers? Although our memory and
bandwidth requirements do not dictate that, since all this information can easily fit on one server,
but for scalability, performance, and fault tolerance we should distribute the hash table onto
multiple servers. We can distribute based on the DriverID to make the distribution completely
random. Let’s call the machines holding DriverLocationHT, Driver Location server. Other than
storing driver’s location, each of these servers will do two things:
1. As soon as the server receives an update for a driver’s location, they will broadcast that
information to all the interested customers.
2. The server needs to notify respective QuadTree server to refresh the driver’s location. As
discussed above, this can happen every 10 seconds.
How can we efficiently broadcast driver’s location to customers? We can have a Push Model,
where the server will push the positions to all the relevant users. We can have a dedicated
Notification Service that can broadcast the current location of drivers to all the interesting
customers. We can build our Notification service on publisher/subscriber model. When a customer
opens the Uber app on their cell phone, they query the server to find nearby drivers. On the server
side, before returning the list of drivers to the customer, we will subscribe the customer for all the
updates from those drivers. We can maintain a list of customers (subscribers) interested in knowing
the location of a driver and whenever we have an update in DriverLocationHT for that driver, we
can broadcast the current location of the driver to all subscribed customers. This way our system
makes sure that we always show driver’s current position to the customer.
How much memory we need to store all these subscriptions? As we have estimated above we
will have 1M daily active customers and 500K daily active drivers. On the average let’s assume that
five customers subscribe one driver. Let’s assume we store all this information in a hash table so
that we can update it efficiently. We need to store driver and customer IDs to maintain the
subscriptions. Assuming we will need 3 bytes for DriverID and 8 bytes for CustomerID, we will
need 21MB of memory.
(500K * 3) + (500K * 5 * 8 ) ~= 21 MB
How much bandwidth we need to broadcast the driver’s location to customers? For every
active driver we have five subscribers, so total subscribers we have:
5 * 500K => 2.5M
To all these customers we need to send DriverID (3 bytes) and their location (16 bytes) every
second, so we need the following bandwidth:
2.5M * 19 bytes => 47.5 MB/s
How can we efficiently implement Notification service? We can either use HTTP long polling or
push notifications.
How the new publishers/drivers will get added for a current customer? As we have proposed
above that customers will be subscribed to nearby drivers when they open the Uber app for the first
time, what will happen when a new driver enters the area the customer is looking at? To add a new
customer/driver subscription dynamically, we need to keep track of the area the customer is
watching. This will make our solution complicated, what if instead of pushing this information,
clients pull it from the server?
How about if clients pull information about nearby drivers from the server? Clients can send
their current location, and the server will find all the nearby drivers from the QuadTree to return
them to the client. Upon receiving this information, the client can update their screen to reflect
current positions of the drivers. Clients can query every five seconds to limit the number of round
trips to the server. This solution looks quite simpler compared to the push model described above.
Do we need to repartition a grid as soon as it reaches the maximum limit? We can have a
cushion to let each grid grow a little bigger beyond the limit before we decide to partition it. Let’s
say our grids can grow/shrink extra 10% before we partition/merge them. This should decrease the
load for grid partition or merge on high traffic grids.
6. Ranking
How about if we want to rank the search results not just by proximity but also by popularity or
relevance?
How can we return top rated drivers within a given radius? Let’s assume we keep track of the
overall ratings of each driver in our database and QuadTree. An aggregated number can represent
this popularity in our system, e.g., how many stars a driver gets out of ten? While searching for top
10 drivers within a given radius, we can ask each partition of the QuadTree to return top 10 drivers
with maximum rating. The aggregator server can then determine top 10 drivers among all the
drivers returned by different partitions.
7. Advanced Issues
1. How to handle clients on slow and disconnecting networks?
2. What if a client gets disconnected when it was a part of a ride? How will we handle billing
in such a scenario?
3. How about if clients pull all the information as compared to servers always pushing it?
Design Ticketmaster
Let's design an online ticketing system that sells movie tickets like Ticketmaster or
BookMyShow. Similar Services: [Link], [Link] Difficulty Level: Hard
Functional Requirements:
1. Our ticket booking service should be able to list different cities where its affiliate
cinemas are located.
2. Once the user selects the city, the service should display the movies released in that
particular city.
3. Once the user selects a movie, the service should display the cinemas running that
movie and its available show times.
4. The user should be able to choose a show at a particular cinema and book their
tickets.
5. The service should be able to show the user the seating arrangement of the cinema
hall. The user should be able to select multiple seats according to their preference.
6. The user should be able to distinguish available seats from booked ones.
7. Users should be able to put a hold on the seats for five minutes before they make a
payment to finalize the booking.
8. The user should be able to wait if there is a chance that the seats might become
available, e.g., when holds by other users expire.
9. Waiting customers should be serviced in a fair, first come, first serve manner.
Non-Functional Requirements:
1. The system would need to be highly concurrent. There will be multiple booking
requests for the same seat at any particular point in time. The service should handle
this gracefully and fairly.
2. The core thing of the service is ticket booking, which means financial transactions.
This means that the system should be secure and the database ACID compliant.
1. For simplicity, let’s assume our service does not require any user authentication.
2. The system will not handle partial ticket orders. Either user gets all the tickets they
want or they get nothing.
3. Fairness is mandatory for the system.
4. To stop system abuse, we can restrict users from booking more than ten seats at a
time.
5. We can assume that traffic would spike on popular/much-awaited movie releases
and the seats would fill up pretty fast. The system should be scalable and highly
available to keep up with the surge in traffic.
4. Capacity Estimation
Traffic estimates: Let’s assume that our service has 3 billion page views per month and sells
10 million tickets a month.
Storage estimates: Let’s assume that we have 500 cities and, on average each city has ten
cinemas. If there are 2000 seats in each cinema and on average, there are two shows every
day.
Let’s assume each seat booking needs 50 bytes (IDs, NumberOfSeats, ShowID, MovieID,
SeatNumbers, SeatStatus, Timestamp, etc.) to store in the database. We would also need to
store information about movies and cinemas; let’s assume it’ll take 50 bytes. So, to store all
the data about all shows of all cinemas of all cities for a day:
500 cities * 10 cinemas * 2000 seats * 2 shows * (50+50) bytes = 2GB / day
5. System APIs
We can have SOAP or REST APIs to expose the functionality of our service. The following
could be the definition of the APIs to search movie shows and reserve seats.
Parameters:
api_dev_key (string): The API developer key of a registered account. This will be used to,
among other things, throttle users based on their allocated quota.
keyword (string): Keyword to search on.
city (string): City to filter movies by.
lat_long (string): Latitude and longitude to filter by. radius (number): Radius of the area in
which we want to search for events.
start_datetime (string): Filter movies with a starting datetime.
end_datetime (string): Filter movies with an ending datetime.
postal_code (string): Filter movies by postal code / zipcode.
includeSpellcheck (Enum: “yes” or “no”): Yes, to include spell check suggestions in the
response.
results_per_page (number): Number of results to return per page. Maximum is 30.
sorting_order (string): Sorting order of the search result. Some allowable values : ‘name,asc’,
‘name,desc’, ‘date,asc’, ‘date,desc’, ‘distance,asc’, ‘name,date,asc’, ‘name,date,desc’,
‘date,name,asc’, ‘date,name,desc’.
Returns: (JSON)
Here is a sample list of movies and their shows:
[
{
"MovieID": 1,
"ShowID": 1,
"Title": "Cars 2",
"Description": "About cars",
"Duration": 120,
"Genre": "Animation",
"Language": "English",
"ReleaseDate": "8th Oct. 2014",
"Country": "USA",
"StartTime": "14:00",
"EndTime": "16:00",
"Seats": [
{
"Type": "Regular",
"Price": "14.99",
"Status": "AlmostFull"
},
{
"Type": "Premium",
"Price": "24.99",
"Status": "Available"
}
]
},
{
"MovieID": 1,
"ShowID": 2,
"Title": "Cars 2",
"Description": "About cars",
"Duration": 120,
"Genre": "Animation",
"Language": "English",
"ReleaseDate": "8th Oct. 2014",
"Country": "USA",
"StartTime": "16:30",
"EndTime": "18:30",
"Seats": [
{
"Type": "Regular",
"Price": "14.99",
"Status": "Full"
},
{
"Type": "Premium",
"Price": "24.99",
"Status": "Almost Full"
}
]
}
]
ReserveSeats(api_dev_key, session_id, movie_id, show_id, seats_to_
reserve[])
Parameters:
api_dev_key (string): same as above
session_id (string): User’s session ID to track this reservation. Once the reservation time
expires, user’s reservation on the server will be removed using this ID.
movie_id (string): Movie to reserve.
show_id (string): Show to reserve.
seats_to_reserve (number): An array containing seat IDs to reserve.
Returns: (JSON)
Returns the status of the reservation, which would be one of the following:
1) “Reservation Successful”
6. Database Design
Here are a few observations about the data we are going to store:
Country: varchar(64)
Genre: varchar(20)
Clients
<div><br></div>
Load Balancers [Not supported by viewer]
Web Servers
Databases
Ticket Booking Workflow: The following would be a typical ticket booking workflow:
§ If the required number of seats become available, the user is taken to the
theater map page where they can choose seats.
§ While waiting, if all seats get booked or there are fewer seats in the
reservation pool than the user intend to book, the user is shown the error
message.
§ User cancels the waiting and is taken back to the movie search page.
§ At maximum, a user can wait one hour, after that user’s session gets expired
and the user is taken back to the movie search page.
9. If seats are reserved successfully, the user has five minutes to pay for the
reservation. After payment, booking is marked complete. If the user is not able to
pay within five minutes, all their reserved seats are freed to become available to
other users.
How would the server keep track of all the active reservation that haven’t been booked yet?
And how would the server keep track of all the waiting customers?
We need two daemon services, one to keep track of all active reservations and remove any
expired reservation from the system; let’s call it ActiveReservationService. The other service
would be keeping track of all the waiting user requests and, as soon as the required
number of seats become available, it will notify the (the longest waiting) user to choose the
seats; let’s call it WaitingUserService.
a. ActiveReservationsService
We can keep all the reservations of a ‘show’ in memory in a data structure similar
to Linked HashMap or a TreeMap in addition to keeping all the data in the database. We
will need a linked HashMap kind of data structure that allows us to jump to any
reservation to remove it when the booking is complete. Also, since we will have expiry time
associated with each reservation, the head of the HashMap will always point to the oldest
reservation record so that the reservation can be expired when the timeout is reached.
To store every reservation for every show, we can have a HashTable where the ‘key’ would
be ‘ShowID’ and the ‘value’ would be the Linked HashMap containing ‘BookingID’ and
creation ‘Timestamp’.
In the database, we will store the reservation in the ‘Booking’ table and the expiry time will
be in the Timestamp column. The ‘Status’ field will have a value of ‘Reserved (1)’ and, as
soon as a booking is complete, the system will update the ‘Status’ to ‘Booked (2)’ and
remove the reservation record from the Linked HashMap of the relevant show. When the
reservation is expired, we can either remove it from the Booking table or mark it ‘Expired
(3)’ in addition to removing it from memory.
ActiveReservationsService will also work with the external financial service to process user
payments. Whenever a booking is completed, or a reservation gets expired,
WaitingUsersService will get a signal so that any waiting customer can be served.
b. WaitingUsersService
Just like ActiveReservationsService, we can keep all the waiting users of a show in memory
in a Linked HashMap or a TreeMap. We need a data structure similar to Linked HashMap
so that we can jump to any user to remove them from the HashMap when the user cancels
their request. Also, since we are serving in a first-come-first-serve manner, the head of the
Linked HashMap would always be pointing to the longest waiting user, so that whenever
seats become available, we can serve users in a fair manner.
We will have a HashTable to store all the waiting users for every Show. The ‘key’ would be
'ShowID, and the ‘value’ would be a Linked HashMap containing ‘UserIDs’ and their wait-
start-time.
Clients can use Long Polling for keeping themselves updated for their reservation status.
Whenever seats become available, the server can use this request to notify the user.
Reservation Expiration
On the server, ActiveReservationsService keeps track of expiry (based on reservation time)
of active reservations. As the client will be shown a timer (for the expiration time), which
could be a little out of sync with the server, we can add a buffer of five seconds on the
server to safeguard from a broken experience, such that the client never times out after the
server, preventing a successful purchase.
9. Concurrency
How to handle concurrency, such that no two users are able to book same seat. We can use
transactions in SQL databases to avoid any clashes. For example, if we are using an SQL
server we can utilize Transaction Isolation Levels to lock the rows before we can update
them. Here is the sample code:
BEGIN TRANSACTION;
--
Suppose we intend to reserve three seats (IDs: 54, 55, 56) for Sh
owID=99
Select * From Show_Seat where ShowID=99 && ShowSeatID in (54,
55, 56) && Status=0 -- free
--
if the number of rows returned by the above statement is three, w
e can update to
-- return success otherwise return failure to the user.
update Show_Seat ...
update Booking ...
COMMIT TRANSACTION;
‘Serializable’ is the highest isolation level and guarantees safety from Dirty, Nonrepeatable,
and Phantoms reads. One thing to note here; within a transaction, if we read rows, we get a
write lock on them so that they can’t be updated by anyone else.
Once the above database transaction is successful, we can start tracking the reservation in
ActiveReservationService.
Similarly, we’ll have a master-slave setup for databases to make them fault tolerant.
1. The server holding that reservation sends a message to all servers holding the
waiting users of that Show, so that those servers can expire all the waiting users that
need more seats than the available seats.
2. Upon receiving the above message, all servers holding the waiting users will query
the database to find how many free seats are available now. A database cache would
greatly help here to run this query only once.
3. Expire all waiting users who want to reserve more seats than the available seats. For
this, WaitingUserService has to iterate through the Linked HashMap of all the
waiting users.