Skip to content

Commit f875f5c

Browse files
symatMate Szalay-Beko
authored andcommitted
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
2 parents 31805e7 + cd46594 commit f875f5c

3 files changed

Lines changed: 396 additions & 87 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Lines changed: 43 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@
4141
import java.util.HashSet;
4242
import java.util.List;
4343
import java.util.Map;
44-
import java.util.NoSuchElementException;
4544
import java.util.Set;
46-
import java.util.concurrent.ArrayBlockingQueue;
45+
import java.util.concurrent.BlockingQueue;
4746
import java.util.concurrent.ConcurrentHashMap;
4847
import java.util.concurrent.CountDownLatch;
4948
import java.util.concurrent.ExecutorService;
@@ -66,6 +65,7 @@
6665
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
6766
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
6867
import org.apache.zookeeper.server.util.ConfigUtils;
68+
import org.apache.zookeeper.util.CircularBlockingQueue;
6969
import org.slf4j.Logger;
7070
import org.slf4j.LoggerFactory;
7171

@@ -149,17 +149,13 @@ public class QuorumCnxManager {
149149
* Mapping from Peer to Thread number
150150
*/
151151
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
152-
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
152+
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
153153
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
154154

155155
/*
156156
* Reception queue
157157
*/
158-
public final ArrayBlockingQueue<Message> recvQueue;
159-
/*
160-
* Object to synchronize access to recvQueue
161-
*/
162-
private final Object recvQLock = new Object();
158+
public final BlockingQueue<Message> recvQueue;
163159

164160
/*
165161
* Shutdown flag
@@ -273,7 +269,7 @@ public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.
273269
QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
274270
int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
275271

276-
this.recvQueue = new ArrayBlockingQueue<>(RECV_CAPACITY);
272+
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
277273
this.queueSendMap = new ConcurrentHashMap<>();
278274
this.senderWorkerMap = new ConcurrentHashMap<>();
279275
this.lastMessageSent = new ConcurrentHashMap<>();
@@ -462,7 +458,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
462458
}
463459

464460
senderWorkerMap.put(sid, sw);
465-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
461+
462+
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
466463

467464
sw.start();
468465
rw.start();
@@ -597,7 +594,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
597594

598595
senderWorkerMap.put(sid, sw);
599596

600-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
597+
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
601598

602599
sw.start();
603600
rw.start();
@@ -622,10 +619,9 @@ public void toSend(Long sid, ByteBuffer b) {
622619
/*
623620
* Start a new connection if doesn't have one already.
624621
*/
625-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
622+
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
626623
addToSendQueue(bq, b);
627624
connectOne(sid);
628-
629625
}
630626
}
631627

@@ -761,9 +757,10 @@ public void connectAll() {
761757
* Check if all queues are empty, indicating that all messages have been delivered.
762758
*/
763759
boolean haveDelivered() {
764-
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
765-
LOG.debug("Queue size: {}", queue.size());
766-
if (queue.size() == 0) {
760+
for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
761+
final int queueSize = queue.size();
762+
LOG.debug("Queue size: {}", queueSize);
763+
if (queueSize == 0) {
767764
return true;
768765
}
769766
}
@@ -1207,7 +1204,7 @@ public void run() {
12071204
* message than that stored in lastMessage. To avoid sending
12081205
* stale message, we should send the message in the send queue.
12091206
*/
1210-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
1207+
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
12111208
if (bq == null || isSendQueueEmpty(bq)) {
12121209
ByteBuffer b = lastMessageSent.get(sid);
12131210
if (b != null) {
@@ -1225,7 +1222,7 @@ public void run() {
12251222

12261223
ByteBuffer b = null;
12271224
try {
1228-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
1225+
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
12291226
if (bq != null) {
12301227
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
12311228
} else {
@@ -1365,37 +1362,19 @@ public void run() {
13651362
}
13661363

13671364
/**
1368-
* Inserts an element in the specified queue. If the Queue is full, this
1369-
* method removes an element from the head of the Queue and then inserts
1370-
* the element at the tail. It can happen that an element is removed
1371-
* by another thread in {@link SendWorker#run() }
1372-
* method before this method attempts to remove an element from the queue.
1373-
* This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
1374-
* exception, which is safe to ignore.
1375-
*
1376-
* Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
1377-
* not need to be synchronized since there is only one thread that inserts
1378-
* an element in the queue and another thread that reads from the queue.
1365+
* Inserts an element in the provided {@link BlockingQueue}. This method
1366+
* assumes that if the Queue is full, an element from the head of the Queue is
1367+
* removed and the new item is inserted at the tail of the queue. This is done
1368+
* to prevent a thread from blocking while inserting an element in the queue.
13791369
*
1380-
* @param queue
1381-
* Reference to the Queue
1382-
* @param buffer
1383-
* Reference to the buffer to be inserted in the queue
1370+
* @param queue Reference to the Queue
1371+
* @param buffer Reference to the buffer to be inserted in the queue
13841372
*/
1385-
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
1386-
if (queue.remainingCapacity() == 0) {
1387-
try {
1388-
queue.remove();
1389-
} catch (NoSuchElementException ne) {
1390-
// element could be removed by poll()
1391-
LOG.debug("Trying to remove from an empty Queue. Ignoring exception.", ne);
1392-
}
1393-
}
1394-
try {
1395-
queue.add(buffer);
1396-
} catch (IllegalStateException ie) {
1397-
// This should never happen
1398-
LOG.error("Unable to insert an element in the queue ", ie);
1373+
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
1374+
final ByteBuffer buffer) {
1375+
final boolean success = queue.offer(buffer);
1376+
if (!success) {
1377+
throw new RuntimeException("Could not insert into receive queue");
13991378
}
14001379
}
14011380

@@ -1406,7 +1385,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
14061385
* @return
14071386
* true if the specified queue is empty
14081387
*/
1409-
private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
1388+
private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
14101389
return queue.isEmpty();
14111390
}
14121391

@@ -1415,60 +1394,37 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
14151394
* waiting up to the specified wait time if necessary for an element to
14161395
* become available.
14171396
*
1418-
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
1397+
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
14191398
*/
1420-
private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
1421-
return queue.poll(timeout, unit);
1399+
private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
1400+
final long timeout, final TimeUnit unit) throws InterruptedException {
1401+
return queue.poll(timeout, unit);
14221402
}
14231403

14241404
/**
14251405
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
1426-
* methods removes an element from the head of the Queue and then inserts
1427-
* the element at the tail of the queue.
1428-
*
1429-
* This method is synchronized to achieve fairness between two threads that
1430-
* are trying to insert an element in the queue. Each thread checks if the
1431-
* queue is full, then removes the element at the head of the queue, and
1432-
* then inserts an element at the tail. This three-step process is done to
1433-
* prevent a thread from blocking while inserting an element in the queue.
1434-
* If we do not synchronize the call to this method, then a thread can grab
1435-
* a slot in the queue created by the second thread. This can cause the call
1436-
* to insert by the second thread to fail.
1437-
* Note that synchronizing this method does not block another thread
1438-
* from polling the queue since that synchronization is provided by the
1439-
* queue itself.
1406+
* methods removes an element from the head of the Queue and then inserts the
1407+
* element at the tail of the queue.
14401408
*
1441-
* @param msg
1442-
* Reference to the message to be inserted in the queue
1409+
* @param msg Reference to the message to be inserted in the queue
14431410
*/
1444-
public void addToRecvQueue(Message msg) {
1445-
synchronized (recvQLock) {
1446-
if (recvQueue.remainingCapacity() == 0) {
1447-
try {
1448-
recvQueue.remove();
1449-
} catch (NoSuchElementException ne) {
1450-
// element could be removed by poll()
1451-
LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception.", ne);
1452-
}
1453-
}
1454-
try {
1455-
recvQueue.add(msg);
1456-
} catch (IllegalStateException ie) {
1457-
// This should never happen
1458-
LOG.error("Unable to insert element in the recvQueue ", ie);
1459-
}
1460-
}
1411+
public void addToRecvQueue(final Message msg) {
1412+
final boolean success = this.recvQueue.offer(msg);
1413+
if (!success) {
1414+
throw new RuntimeException("Could not insert into receive queue");
1415+
}
14611416
}
14621417

14631418
/**
14641419
* Retrieves and removes a message at the head of this queue,
14651420
* waiting up to the specified wait time if necessary for an element to
14661421
* become available.
14671422
*
1468-
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
1423+
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
14691424
*/
1470-
public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
1471-
return recvQueue.poll(timeout, unit);
1425+
public Message pollRecvQueue(final long timeout, final TimeUnit unit)
1426+
throws InterruptedException {
1427+
return this.recvQueue.poll(timeout, unit);
14721428
}
14731429

14741430
public boolean connectedToPeer(long peerSid) {

0 commit comments

Comments
 (0)