Skip to content

Commit b3ac3cc

Browse files
committed
Rebased to master
1 parent c63ab85 commit b3ac3cc

1 file changed

Lines changed: 30 additions & 53 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Lines changed: 30 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.zookeeper.server.quorum;
2020

2121
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
22+
2223
import java.io.BufferedInputStream;
2324
import java.io.BufferedOutputStream;
2425
import java.io.DataInputStream;
@@ -46,7 +47,9 @@
4647
import java.util.concurrent.TimeUnit;
4748
import java.util.concurrent.atomic.AtomicInteger;
4849
import java.util.concurrent.atomic.AtomicLong;
50+
4951
import javax.net.ssl.SSLSocket;
52+
5053
import org.apache.zookeeper.common.X509Exception;
5154
import org.apache.zookeeper.server.ExitCode;
5255
import org.apache.zookeeper.server.ZooKeeperThread;
@@ -55,6 +58,7 @@
5558
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
5659
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
5760
import org.apache.zookeeper.server.util.ConfigUtils;
61+
import org.apache.zookeeper.util.CircularBlockingQueue;
5862
import org.slf4j.Logger;
5963
import org.slf4j.LoggerFactory;
6064

@@ -137,7 +141,7 @@ public class QuorumCnxManager {
137141
* Mapping from Peer to Thread number
138142
*/
139143
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
140-
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
144+
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
141145
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
142146

143147
/*
@@ -249,10 +253,10 @@ public static InitialMessage parse(Long protocolVersion, DataInputStream din) th
249253
}
250254

251255
public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
252-
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
253-
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
254-
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
255-
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
256+
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
257+
this.queueSendMap = new ConcurrentHashMap<>();
258+
this.senderWorkerMap = new ConcurrentHashMap<>();
259+
this.lastMessageSent = new ConcurrentHashMap<>();
256260

257261
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
258262
if (cnxToValue != null) {
@@ -435,7 +439,7 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
435439

436440
senderWorkerMap.put(sid, sw);
437441

438-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
442+
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
439443

440444
sw.start();
441445
rw.start();
@@ -570,7 +574,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
570574

571575
senderWorkerMap.put(sid, sw);
572576

573-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
577+
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
574578

575579
sw.start();
576580
rw.start();
@@ -595,7 +599,7 @@ public void toSend(Long sid, ByteBuffer b) {
595599
/*
596600
* Start a new connection if doesn't have one already.
597601
*/
598-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
602+
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
599603
addToSendQueue(bq, b);
600604
connectOne(sid);
601605
}
@@ -720,9 +724,10 @@ public void connectAll() {
720724
* Check if all queues are empty, indicating that all messages have been delivered.
721725
*/
722726
boolean haveDelivered() {
723-
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
724-
LOG.debug("Queue size: {}", queue.size());
725-
if (queue.size() == 0) {
727+
for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
728+
final int queueSize = queue.size();
729+
LOG.debug("Queue size: {}", queueSize);
730+
if (queueSize == 0) {
726731
return true;
727732
}
728733
}
@@ -1212,30 +1217,19 @@ public void run() {
12121217
}
12131218

12141219
/**
1215-
* Inserts an element in the specified queue. If the Queue is full, this
1216-
* method removes an element from the head of the Queue and then inserts
1217-
* the element at the tail. It can happen that an element is removed
1218-
* by another thread in {@link SendWorker#run() }
1219-
* method before this method attempts to remove an element from the queue.
1220-
* This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
1221-
* exception, which is safe to ignore.
1222-
*
1223-
* Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
1224-
* not need to be synchronized since there is only one thread that inserts
1225-
* an element in the queue and another thread that reads from the queue.
1220+
* Inserts an element in the provided {@link BlockingQueue}. This method
1221+
* assumes that if the Queue is full, an element from the head of the Queue is
1222+
* removed and the new item is inserted at the tail of the queue. This is done
1223+
* to prevent a thread from blocking while inserting an element in the queue.
12261224
*
1227-
* @param queue
1228-
* Reference to the Queue
1229-
* @param buffer
1230-
* Reference to the buffer to be inserted in the queue
1225+
* @param queue Reference to the Queue
1226+
* @param buffer Reference to the buffer to be inserted in the queue
12311227
*/
12321228
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
12331229
final ByteBuffer buffer) {
12341230
final boolean success = queue.offer(buffer);
12351231
if (!success) {
1236-
LOG.debug("Could not insert buffer into queue. Discarding one.");
1237-
queue.poll();
1238-
queue.offer(buffer);
1232+
throw new RuntimeException("Could not insert into receive queue");
12391233
}
12401234
}
12411235

@@ -1264,33 +1258,16 @@ private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
12641258

12651259
/**
12661260
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
1267-
* methods removes an element from the head of the Queue and then inserts
1268-
* the element at the tail of the queue.
1269-
*
1270-
* This method is synchronized to achieve fairness between two threads that
1271-
* are trying to insert an element in the queue. Each thread checks if the
1272-
* queue is full, then removes the element at the head of the queue, and
1273-
* then inserts an element at the tail. This three-step process is done to
1274-
* prevent a thread from blocking while inserting an element in the queue.
1275-
* If we do not synchronize the call to this method, then a thread can grab
1276-
* a slot in the queue created by the second thread. This can cause the call
1277-
* to insert by the second thread to fail.
1278-
* Note that synchronizing this method does not block another thread
1279-
* from polling the queue since that synchronization is provided by the
1280-
* queue itself.
1261+
* methods removes an element from the head of the Queue and then inserts the
1262+
* element at the tail of the queue.
12811263
*
1282-
* @param msg
1283-
* Reference to the message to be inserted in the queue
1264+
* @param msg Reference to the message to be inserted in the queue
12841265
*/
12851266
public void addToRecvQueue(final Message msg) {
1286-
synchronized (this.recvQueue) {
1287-
final boolean success = this.recvQueue.offer(msg);
1288-
if (!success) {
1289-
LOG.debug("Could not insert buffer into recv queue. Discarding one.");
1290-
this.recvQueue.poll();
1291-
this.recvQueue.offer(msg);
1292-
}
1293-
}
1267+
final boolean success = this.recvQueue.offer(msg);
1268+
if (!success) {
1269+
throw new RuntimeException("Could not insert into receive queue");
1270+
}
12941271
}
12951272

12961273
/**

0 commit comments

Comments
 (0)