Chapter 5
Distributed Transaction Processing
The concept of a transaction is used in database systems as a basic unit of
consistent and reliable computing. Thus, queries are executed as transactions once
their execution strategies are determined and they are translated into primitive
database operations. Transactions ensure that database consistency and durability
are maintained when concurrent access occurs to the same data item (with at least
one of these being an update) and when failures occur.
The terms consistent and reliable in transaction definition need to be defined
more precisely. We differentiate between database consistency and transaction
consistency.
A database is in a consistent state if it obeys all of the consistency (integrity)
constraints defined over it (see Chap. 3). State changes occur due to modifications,
insertions, and deletions (together called updates). Of course, we want to ensure
that the database never enters an inconsistent state. Note that the database can
be (and usually is) temporarily inconsistent during the execution of a transaction.
The important point is that the database should be consistent when the transaction
terminates (Fig. 5.1).
Transaction consistency, on the other hand, refers to the operations of concurrent
transactions. We would like the database to remain in a consistent state even if there
are a number of user requests that are concurrently accessing (reading or updating)
the database.
Reliability refers to both the resiliency of a system to various types of failures and
its capability to recover from them. A resilient system is tolerant of system failures
and can continue to provide services even when failures occur. A recoverable DBMS
is one that can get to a consistent state (by moving back to a previous consistent state
or forward to a new consistent state) following various types of failures.
Transaction management deals with the problems of always keeping the database
in a consistent state even when concurrent accesses and failures occur. The issues in
managing concurrent transactions are well-known in centralized DBMSs and can
be found in many textbooks. In this chapter, we investigate these issues within
© Springer Nature Switzerland AG 2020 183
M. T. Özsu, P. Valduriez, Principles of Distributed Database Systems,
https://doi.org/10.1007/978-3-030-26253-2_5
184 5 Distributed Transaction Processing
Database may be temporarily in an
inconsistent state during execution
Database in a Database in a
consistent state consistent state
Execution of transaction T
Begin End
Transaction T Transaction T
Fig. 5.1 A transaction model
the context of distributed DBMSs focusing on distributed concurrency control and
distributed reliability and recovery. We expect that the reader has familiarity with
the basic transaction management concepts and techniques as commonly covered in
undergraduate database courses and books. We provide a brief refresher in Sect. 5.1.
More detailed discussion of the fundamental transaction processing concepts is
in Appendix C. For now, we ignore data replication issues; the following chapter
is devoted to that topic. DBMSs are typically classified as On-Line Transaction
Processing (OLTP) or On-Line Analytical Processing (OLAP). On-Line Transaction
Processing applications, such as airline reservation or banking systems, are high-
throughput transaction-oriented. They need extensive data control and availability,
high multiuser throughput and predictable, fast response times. In contrast, On-
line Analytical Processing applications, such as trend analysis or forecasting, need
to analyze historical, summarized data coming from a number of operational
databases. They use complex queries over potentially very large tables. Most OLAP
applications do not need the most current versions of the data, and thus do not need
direct access to most up-to-date operational data. In this chapter, we focus on OLTP
systems and consider OLAP systems in Chap. 7.
The organization of this chapter is as follows. In Sect. 5.1 we provide a quick
introduction to the basic terminology that is used in this chapter, and revisit the
architectural model defined in Chap. 1 to highlight the modifications that are neces-
sary to support transaction management. Section 5.2 provides an in-depth treatment
of serializability-based distributed concurrency control techniques, while Sect. 5.3
considers concurrency control under snapshot isolation. Section 5.4 discusses
distributed reliability techniques focusing on distributed commit, termination, and
recovery protocols.
5.1 Background and Terminology
Our objective in this section is to provide a very brief introduction to the concepts
and terminology that we use in the rest of the chapter. As mentioned previously, our
5.1 Background and Terminology 185
objective is not to provide a deep overview of the fundamental concepts—those can
be found in Appendix C—but to introduce the basic terminology that will be helpful
in the rest of the chapter. We also discuss how the system architecture needs to be
revised to accommodate transactions.
As indicated before, a transaction is a unit of consistent and reliable computation.
Each transaction begins with a Begin_transaction command, includes a series of
Read and Write operations, and ends with either a Commit or an Abort. Commit,
when processed, ensures that the updates that the transaction has made to the
database are permanent from that point on, while Abort undoes the transaction’s
actions so, as far as the database is concerned, it is as if the transaction has never
been executed. Each transaction is characterized by its read set (RS) that includes
the data items that it reads, and its write set (WS) of the data items that it writes. The
read set and write set of a transaction need not be mutually exclusive. The union of
the read set and write set of a transaction constitutes its base set (BS = RS ∪ W S).
Typical DBMS transaction services provide ACID properties:
1. Atomicity ensures that transaction executions are atomic, i.e., either all of the
actions of a transaction are reflected in the database or none of it are.
2. Consistency refers to a transaction being a correct execution (i.e., the transaction
code is correct and when it executes on a database that is consistent, it will leave
it in a consistent state).
3. Isolation indicates that the effects of concurrent transactions are shielded from
each other until they commit—this is how the correctness of concurrently
executing transactions are ensured (i.e., executing transactions concurrently does
not break database consistency).
4. Durability refers to that property of transactions that ensures that the effects of
committed transactions on the database are permanent and will survive system
crashes.
Concurrency control algorithms that we discuss in Sect. 5.2 enforce the isolation
property so that concurrent transactions see a consistent database state and leave the
database in a consistent state, while reliability measures we discuss in Sect. 5.4
enforce atomicity and durability. Consistency in terms of ensuring that a given
transaction does not do anything incorrect to the database is typically handled by
integrity enforcement as discussed in Chap. 3.
Concurrency control algorithms implement a notion of “correct concurrent
execution.” The most common correctness notion is serializability that requires that
the history generated by the concurrent execution of transactions is equivalent to
some serial history (i.e., a sequential execution of these transactions). Given that a
transaction maps one consistent database state to another consistent database state,
any serial execution order is, by definition, correct; if the concurrent execution
history is equivalent to one of these orders, it must also be correct. In Sect. 5.3,
we introduce a more relaxed correctness notion called snapshot isolation (SI).
Concurrency control algorithms are basically concerned with enforcing different
levels of isolation among concurrent transactions very efficiently.
186 5 Distributed Transaction Processing
When a transaction commits, its actions need to be made permanent. This
requires management of transaction logs where each action of a transaction is
recorded. The commit protocols ensure that database updates as well as logs are
saved into persistent storage so that they are made permanent. Abort protocols, on
the other hand, use the logs to erase all actions of the aborted transaction from the
database. When recovery from system crashes is needed, the logs are consulted to
bring the database to a consistent state.
The introduction of transactions to the DBMS workload along with read-only
queries requires revisiting the architectural model introduced in Chap. 1. The
revision is an expansion of the role of the distributed execution monitor.
The distributed execution monitor consists of two modules: a transaction
manager (TM) and a scheduler (SC). The transaction manager is responsible for
coordinating the execution of the database operations on behalf of an application.
The scheduler, on the other hand, is responsible for the implementation of a specific
concurrency control algorithm for synchronizing access to the database.
A third component that participates in the management of distributed transactions
is the local recovery managers (LRM) that exist at each site. Their function is to
implement the local procedures by which the local database can be recovered to a
consistent state following a failure.
Each transaction originates at one site, which we will call its originating site.
The execution of the database operations of a transaction is coordinated by the TM
at that transaction’s originating site. We refer to the TM at the originating site as the
coordinator or the coordinating TM.
A transaction manager implements an interface for the application programs
to the transaction commands identified earlier: Begin_transaction, Read, Write,
Commit, and Abort. The processing of each of these commands in a nonreplicated
distributed DBMS is discussed below at an abstract level. For simplicity, we
concentrate on the interface to the TM; the details are presented in the following
sections.
1. Begin_transaction. This is an indicator to the coordinating TM that a new trans-
action is starting. The TM does some bookkeeping by recording the transaction’s
name, the originating application, and so on, in a main memory log (called the
volatile log).
2. Read. If the data item is stored locally, its value is read and returned to the
transaction. Otherwise, the coordinating TM finds where the data item is stored
and requests its value to be returned (after appropriate concurrency control
measures are taken). The site where the data item is read inserts a log record
in the volatile log.
3. Write. If the data item is stored locally, its value is updated (in coordination with
the data processor). Otherwise, the coordinating TM finds where the data item
is located and requests the update to be carried out at that site (after appropriate
concurrency control measures are taken). Again, the site that executes the write
inserts a log record in the volatile log.
5.1 Background and Terminology 187
4. Commit. The TM coordinates the sites involved in updating data items on behalf
of this transaction so that the updates are made durable at every site. WAL
protocol is executed to move volatile log records to a log on disk (called the
stable log).
5. Abort. The TM makes sure that no effects of the transaction are reflected in any of
the databases at the sites where it updated data items. The log is used to execute
the undo (rollback) protocol.
In providing these services, a TM can communicate with SCs and data processors
at the same or at different sites. This arrangement is depicted in Fig. 5.2.
As we indicated in Chap. 1, the architectural model that we have described is
only an abstraction that serves a pedagogical purpose. It enables the separation
of many of the transaction management issues and their independent and isolated
discussion. In Sect. 5.2, as we discuss the scheduling algorithm, we focus on the
interface between a TM and an SC and between an SC and a data processor. In
Sect. 5.4 we consider the execution strategies for the commit and abort commands
in a distributed environment, in addition to the recovery algorithms that need to be
implemented for the recovery manager. In Chap. 6, we extend this discussion to
the case of replicated databases. We should point out that the computational model
that we described here is not unique. Other models have been proposed such as, for
example, using a private workspace for each transaction.
Begin-transaction,
Read, Write,
Commit, Abort Results
Distributed Execution
Monitor
Transaction
Manager With other SCs
(TM)
With other data
processors
Scheduling
Descheduling
Requests
With
other TMs Scheduler
(SC)
Fig. 5.2 Detailed model of the distributed execution monitor
188 5 Distributed Transaction Processing
5.2 Distributed Concurrency Control
As noted above, a concurrency control algorithm enforces a particular isolation
level. In this chapter, we mainly focus on serializability among concurrent trans-
actions. Serializability theory extends in a straightforward manner to the distributed
databases. The history of transaction execution at each site is called a local history.
If the database is not replicated and each local history is serializable, their union
(called the global history) is also serializable as long as local serialization orders
are identical.
Example 5.1 We give a very simple example to demonstrate the point. Consider
two bank accounts, x (stored at Site 1) and y (stored at Site 2), and the following
two transactions where T1 transfers $100 from x to y, while T2 simply reads the
balance of x and y:
T1 : Read(x) T2 : Read(x)
x ← x − 100 Read(y)
Write(x) Commit
Read(y)
y ← y + 100
Write(y)
Commit
Obviously, both of these transactions need to run at both sites. Consider the
following two histories that may be generated locally at the two sites (Hi is the
history at Site i and Rj and Wj are Read and Write operations, respectively, in
transaction Tj ):
H1 ={R1 (x), W1 (x), R2 (x)}
H2 ={R1 (y), W1 (y), R2 (y)}
Both of these histories are serializable; indeed, they are serial. Therefore, each
represents a correct execution order. Furthermore, the serialization order for both
are the same T1 → T2 . Therefore global history that is obtained is also serializable
with the serialization order T1 → T2 .
However, if the histories generated at the two sites are as follows, there is a
problem:
H1 ={R1 (x), W1 (x), R2 (x)}
H2 ={R2 (y), R1 (y), W1 (y)}
Although each local history is still serializable, the serialization orders are
different: H1 serializes T1 before T2 (T1 → T2 ) while H2 serializes T2 before T1
(T2 → T1 ).Therefore, there can be no serializable global history.
5.2 Distributed Concurrency Control 189
Concurrency control protocols are in charge of isolation. A protocol aims at
guaranteeing a particular isolation level such as serializability, snapshot isolation, or
read committed. There are different aspects or dimensions of concurrency control.
The first one is obviously the isolation level(s) aimed by the algorithm. The second
aspect is whether a protocol prevents the isolation to be broken (pessimistic) or
whether it allows it to be broken and then aborts one of the conflicting transactions
to preserve the isolation level (optimistic).
The third dimension is how transactions get serialized. They can be serialized
depending on the order of conflicting accesses or a predefined order, called times-
tamp order. The first case corresponds to locking algorithms where transactions get
serialized based on the order they try to acquire conflicting locks. The second case
corresponds to algorithms that order transactions according to a timestamp. The
timestamp can either be assigned at the start of the transaction (start timestamp)
when they are pessimistic or just before committing the transaction (commit
timestamp) if they are optimistic. The fourth dimension that we consider is how
updates are maintained. One possibility is to keep a single version of the data (that
it is possible in pessimistic algorithms). Another possibility is to keep multiple
versions of the data. The latter is needed for optimistic algorithms, but some
pessimistic algorithms rely on it as well for recovery purposes (basically, they
keep two versions: the latest committed one and the current uncommitted one). We
discuss replication in the next chapter.
Most combinations of these multiple dimensions have been explored. In the
remainder of this section, we focus on the seminal techniques for pessimistic
algorithms, locking (Sect. 5.2.1) and timestamp ordering (Sect. 5.2.2), and the
optimistic ones (Sect. 5.2.4). Understanding these techniques will also help the
reader to move on to more involved algorithms of this kind.
5.2.1 Locking-Based Algorithms
Locking-based concurrency control algorithms prevent isolation violation by main-
taining a “lock” for each lock unit and requiring each operation to obtain a
lock on the data item before it is accessed—either in read (shared) mode or in
write (exclusive) mode. The operation’s access request is decided based on the
compatibility of lock modes—read lock is compatible with another read lock, a
write lock is not compatible with either a read or a write lock. The system manages
the locks using the two-phase locking algorithm. The fundamental decision in
distributed locking-based concurrency control algorithms is where and how the
locks are maintained (usually called a lock table). The following sections provide
algorithms that make different decisions in this regard.
190 5 Distributed Transaction Processing
5.2.1.1 Centralized 2PL
The 2PL algorithm can easily be extended to the distributed DBMS environment
by delegating lock management responsibility to a single site. This means that only
one of the sites has a lock manager; the transaction managers at the other sites
communicate with it to obtain locks. This approach is also known as the primary
site 2PL algorithm.
The communication between the cooperating sites in executing a transaction
according to a centralized 2PL (C2PL) algorithm is depicted in Fig. 5.3 where the
order of messages is indicated. This communication is between the coordinating
TM, the lock manager at the central site, and the data processors (DP) at the other
participating sites. The participating sites are those that store the data items on which
the operation is to be carried out.
The centralized 2PL transaction management algorithm (C2PL-TM) that incor-
porates these changes is given at a very high level in Algorithm 5.1, while the
centralized 2PL lock management algorithm (C2PL-LM) is shown in Algorithm 5.2.
A highly simplified data processor algorithm (DP) is given in Algorithm 5.3, which
will see major changes when we discuss reliability issues in Sect. 5.4.
These algorithms use a 5-tuple for the operation they perform: Op : T ype =
{BT , R, W, A, C}, arg : Data item, val : Value, tid : Transaction identifier, res :
Result. For an operation o : Op, o.T ype ∈ {BT , R, W, A, C} specifies its type
where BT = Begin_transaction, R = Read, W = Write, A = Abort, and C = Commit,
arg is the data item that the operation accesses (reads or writes; for other operations
this field is null), val is the value that has been read or to be written for data item arg
Data processors at
participating sites Coordinating TM Participating TM
Lock
Requ
est
1
ted
Gran
Lock
2
ation
Oper
3
End
of O
perat
ion
4
Relea
se Lo
cks
5
Fig. 5.3 Communication structure of centralized 2PL
5.2 Distributed Concurrency Control 191
Algorithm 5.1: Centralized 2PL Transaction Manager (C2PL-TM)
Input: msg : a message
begin
repeat
wait for a msg
switch msg do
case transaction operation do
let op be the operation
if op.Type = BT then DP(op) {call DP with operation}
else C2PL-LM(op) {call LM with operation}
end case
case Lock Manager response do {lock request granted or locks released}
if lock request granted then
find site that stores the requested data item (say Hi )
DPSi (op) {call DP at site Si with operation}
else {must be lock release message}
inform user about the termination of transaction
end if
end case
case Data Processor response do {operation completed message}
switch transaction operation do
let op be the operation
case R do
return op.val (data item value) to the application
end case
case W do
inform application of completion of the write
end case
case C do
if commit msg has been received from all participants then
inform application of successful completion of transaction
C2PL-LM(op) {need to release locks}
else {wait until commit messages come from all}
record the arrival of the commit message
end if
end case
case A do
inform application of completion of the abort
C2PL-LM(op) {need to release locks}
end case
end switch
end case
end switch
until forever
end
192 5 Distributed Transaction Processing
Algorithm 5.2: Centralized 2PL Lock Manager (C2PL-LM)
Input: op : Op
begin
switch op.Type do
case R or W do {lock request; see if it can be granted}
find the lock unit lu such that op.arg ⊆ lu
if lu is unlocked or lock mode of lu is compatible with op.T ype then
set lock on lu in appropriate mode on behalf of transaction op.tid
send “Lock granted” to coordinating TM of transaction
else
put op on a queue for lu
end if
end case
case C or A do {locks need to be released}
foreach lock unit lu held by transaction do
release lock on lu held by transaction
if there are operations waiting in queue for lu then
find the first operation O on queue
set a lock on lu on behalf of O
send “Lock granted” to coordinating TM of transaction O.tid
end if
end foreach
send “Locks released” to coordinating TM of transaction
end case
end switch
end
(for other operations it is null), tid is the transaction that this operation belongs to
(strictly speaking, this is the transaction identifier), and res indicates the completion
code of operations requested of DP, which is important for reliability algorithms.
The transaction manager (C2PL-TM) algorithm is written as a process that
runs forever and waits until a message arrives from either an application (with a
transaction operation) or from a lock manager, or from a data processor. The lock
manager (C2PL-LM) and data processor (DP) algorithms are written as procedures
that are called when needed. Since the algorithms are given at a high level of
abstraction, this is not a major concern, but actual implementations may, naturally,
be quite different.
One common criticism of C2PL algorithms is that a bottleneck may quickly
form around the central site. The communication between the cooperating sites in
executing a transaction according to a centralized 2PL (C2PL) algorithm is depicted
in Fig. 5.3 where the order of messages is indicated. This communication is between
the coordinating TM, the lock manager at the central site, and the data processors
(DP) at the other participating sites. The participating sites are those that store the
data items on which the operation is to be carried out. Furthermore, the system may
be less reliable since the failure or inaccessibility of the central site would cause
major system failures.
5.2 Distributed Concurrency Control 193
Algorithm 5.3: Data Processor (DP)
Input: op : Op
begin
switch op.Type do {check the type of operation}
case BT do {details to be discussed in Sect. 5.4}
do some bookkeeping
end case
case R do
op.res ← READ(op.arg) ; {database READ operation}
op.res ← “Read done”
end case
case W do {database WRITE of val into data item arg}
WRITE(op.arg, op.val)
op.res ← “Write done”
end case
case C do
COMMIT ; {execute COMMIT }
op.res ← “Commit done”
end case
case A do
ABORT ; {execute ABORT }
op.res ← “Abort done”
end case
end switch
return op
end
5.2.1.2 Distributed 2PL
Distributed 2PL (D2PL) requires the availability of lock managers at each site. The
communication between cooperating sites that execute a transaction according to
the distributed 2PL protocol is depicted in Fig. 5.4.
The distributed 2PL transaction management algorithm is similar to the C2PL-
TM, with two major modifications. The messages that are sent to the central site
lock manager in C2PL-TM are sent to the lock managers at all participating sites in
D2PL-TM. The second difference is that the operations are not passed to the data
processors by the coordinating transaction manager, but by the participating lock
managers. This means that the coordinating transaction manager does not wait for
a “lock request granted” message. Another point about Fig. 5.4 is the following.
The participating data processors send the “end of operation” messages to the
coordinating TM. The alternative is for each DP to send it to its own lock manager
who can then release the locks and inform the coordinating TM. We have chosen to
describe the former since it uses an LM algorithm identical to the strict 2PL lock
manager that we have already discussed and it makes the discussion of the commit
protocols simpler (see Sect. 5.4). Owing to these similarities, we do not give the
194 5 Distributed Transaction Processing
Coordinating Participating Participating
TM Schedulers DMs
Oper
ation
s (Lo
ck Re
que st)
1
Oper
ation
2
peratio n
End of O
3
Relea
se Lo
ck
4
Fig. 5.4 Communication structure of distributed 2PL
distributed TM and LM algorithms here. Distributed 2PL algorithms have been used
in R*and in NonStop SQL.
5.2.1.3 Distributed Deadlock Management
Locking-based concurrency control algorithms may cause deadlocks; in the case
of distributed DBMSs, these could be distributed (or global) deadlocks due to
transactions executing at different sites waiting for each other. Deadlock detection
and resolution is the most popular approach to managing deadlocks in the distributed
setting. The wait-for graph (WFG) can be useful for detecting deadlocks; this is a
directed graph whose vertices are active transactions with an edge from Ti to Tj if
an operation in Ti is waiting to access a data item that is currently locked in an
incompatible mode by an operation in Tj . However, the formation of the WFG
is more complicated in a distributed setting due to the distributed execution of
transactions. Therefore, it is not sufficient for each site to form a local wait-for
graph (LWFG) and check it; it is also necessary to form a global wait-for graph
(GWFG), which is the union of all the LWFGs, and check it for cycles.
Example 5.2 Consider four transactions T1 , T2 , T3 , and T4 with the following wait-
for relationship among them: T1 → T2 → T3 → T4 → T1 . If T1 and T2 run
at site 1 while T3 and T4 run at site 2, the LWFGs for the two sites are shown in
Fig. 5.5a. Notice that it is not possible to detect a deadlock simply by examining the
two LWFGs independently, because the deadlock is global. The deadlock can easily
be detected, however, by examining the GWFG where intersite waiting is shown by
dashed lines (Fig. 5.5b).