4141import java .util .HashSet ;
4242import java .util .List ;
4343import java .util .Map ;
44- import java .util .NoSuchElementException ;
4544import java .util .Set ;
46- import java .util .concurrent .ArrayBlockingQueue ;
45+ import java .util .concurrent .BlockingQueue ;
4746import java .util .concurrent .ConcurrentHashMap ;
4847import java .util .concurrent .CountDownLatch ;
4948import java .util .concurrent .ExecutorService ;
6665import org .apache .zookeeper .server .quorum .auth .QuorumAuthServer ;
6766import org .apache .zookeeper .server .quorum .flexible .QuorumVerifier ;
6867import org .apache .zookeeper .server .util .ConfigUtils ;
68+ import org .apache .zookeeper .util .CircularBlockingQueue ;
6969import org .slf4j .Logger ;
7070import org .slf4j .LoggerFactory ;
7171
@@ -149,17 +149,13 @@ public class QuorumCnxManager {
149149 * Mapping from Peer to Thread number
150150 */
151151 final ConcurrentHashMap <Long , SendWorker > senderWorkerMap ;
152- final ConcurrentHashMap <Long , ArrayBlockingQueue <ByteBuffer >> queueSendMap ;
152+ final ConcurrentHashMap <Long , BlockingQueue <ByteBuffer >> queueSendMap ;
153153 final ConcurrentHashMap <Long , ByteBuffer > lastMessageSent ;
154154
155155 /*
156156 * Reception queue
157157 */
158- public final ArrayBlockingQueue <Message > recvQueue ;
159- /*
160- * Object to synchronize access to recvQueue
161- */
162- private final Object recvQLock = new Object ();
158+ public final BlockingQueue <Message > recvQueue ;
163159
164160 /*
165161 * Shutdown flag
@@ -273,7 +269,7 @@ public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.
273269 QuorumAuthServer authServer , QuorumAuthLearner authLearner , int socketTimeout , boolean listenOnAllIPs ,
274270 int quorumCnxnThreadsSize , boolean quorumSaslAuthEnabled ) {
275271
276- this .recvQueue = new ArrayBlockingQueue <>(RECV_CAPACITY );
272+ this .recvQueue = new CircularBlockingQueue <>(RECV_CAPACITY );
277273 this .queueSendMap = new ConcurrentHashMap <>();
278274 this .senderWorkerMap = new ConcurrentHashMap <>();
279275 this .lastMessageSent = new ConcurrentHashMap <>();
@@ -462,7 +458,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
462458 }
463459
464460 senderWorkerMap .put (sid , sw );
465- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
461+
462+ queueSendMap .putIfAbsent (sid , new CircularBlockingQueue <>(SEND_CAPACITY ));
466463
467464 sw .start ();
468465 rw .start ();
@@ -597,7 +594,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
597594
598595 senderWorkerMap .put (sid , sw );
599596
600- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
597+ queueSendMap .putIfAbsent (sid , new CircularBlockingQueue <>(SEND_CAPACITY ));
601598
602599 sw .start ();
603600 rw .start ();
@@ -622,10 +619,9 @@ public void toSend(Long sid, ByteBuffer b) {
622619 /*
623620 * Start a new connection if doesn't have one already.
624621 */
625- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .computeIfAbsent (sid , serverId -> new ArrayBlockingQueue <>(SEND_CAPACITY ));
622+ BlockingQueue <ByteBuffer > bq = queueSendMap .computeIfAbsent (sid , serverId -> new CircularBlockingQueue <>(SEND_CAPACITY ));
626623 addToSendQueue (bq , b );
627624 connectOne (sid );
628-
629625 }
630626 }
631627
@@ -761,9 +757,10 @@ public void connectAll() {
761757 * Check if all queues are empty, indicating that all messages have been delivered.
762758 */
763759 boolean haveDelivered () {
764- for (ArrayBlockingQueue <ByteBuffer > queue : queueSendMap .values ()) {
765- LOG .debug ("Queue size: {}" , queue .size ());
766- if (queue .size () == 0 ) {
760+ for (BlockingQueue <ByteBuffer > queue : queueSendMap .values ()) {
761+ final int queueSize = queue .size ();
762+ LOG .debug ("Queue size: {}" , queueSize );
763+ if (queueSize == 0 ) {
767764 return true ;
768765 }
769766 }
@@ -1207,7 +1204,7 @@ public void run() {
12071204 * message than that stored in lastMessage. To avoid sending
12081205 * stale message, we should send the message in the send queue.
12091206 */
1210- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
1207+ BlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
12111208 if (bq == null || isSendQueueEmpty (bq )) {
12121209 ByteBuffer b = lastMessageSent .get (sid );
12131210 if (b != null ) {
@@ -1225,7 +1222,7 @@ public void run() {
12251222
12261223 ByteBuffer b = null ;
12271224 try {
1228- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
1225+ BlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
12291226 if (bq != null ) {
12301227 b = pollSendQueue (bq , 1000 , TimeUnit .MILLISECONDS );
12311228 } else {
@@ -1365,37 +1362,19 @@ public void run() {
13651362 }
13661363
13671364 /**
1368- * Inserts an element in the specified queue. If the Queue is full, this
1369- * method removes an element from the head of the Queue and then inserts
1370- * the element at the tail. It can happen that an element is removed
1371- * by another thread in {@link SendWorker#run() }
1372- * method before this method attempts to remove an element from the queue.
1373- * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
1374- * exception, which is safe to ignore.
1375- *
1376- * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
1377- * not need to be synchronized since there is only one thread that inserts
1378- * an element in the queue and another thread that reads from the queue.
1365+ * Inserts an element in the provided {@link BlockingQueue}. This method
1366+ * assumes that if the Queue is full, an element from the head of the Queue is
1367+ * removed and the new item is inserted at the tail of the queue. This is done
1368+ * to prevent a thread from blocking while inserting an element in the queue.
13791369 *
1380- * @param queue
1381- * Reference to the Queue
1382- * @param buffer
1383- * Reference to the buffer to be inserted in the queue
1370+ * @param queue Reference to the Queue
1371+ * @param buffer Reference to the buffer to be inserted in the queue
13841372 */
1385- private void addToSendQueue (ArrayBlockingQueue <ByteBuffer > queue , ByteBuffer buffer ) {
1386- if (queue .remainingCapacity () == 0 ) {
1387- try {
1388- queue .remove ();
1389- } catch (NoSuchElementException ne ) {
1390- // element could be removed by poll()
1391- LOG .debug ("Trying to remove from an empty Queue. Ignoring exception." , ne );
1392- }
1393- }
1394- try {
1395- queue .add (buffer );
1396- } catch (IllegalStateException ie ) {
1397- // This should never happen
1398- LOG .error ("Unable to insert an element in the queue " , ie );
1373+ private void addToSendQueue (final BlockingQueue <ByteBuffer > queue ,
1374+ final ByteBuffer buffer ) {
1375+ final boolean success = queue .offer (buffer );
1376+ if (!success ) {
1377+ throw new RuntimeException ("Could not insert into receive queue" );
13991378 }
14001379 }
14011380
@@ -1406,7 +1385,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
14061385 * @return
14071386 * true if the specified queue is empty
14081387 */
1409- private boolean isSendQueueEmpty (ArrayBlockingQueue <ByteBuffer > queue ) {
1388+ private boolean isSendQueueEmpty (final BlockingQueue <ByteBuffer > queue ) {
14101389 return queue .isEmpty ();
14111390 }
14121391
@@ -1415,60 +1394,37 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
14151394 * waiting up to the specified wait time if necessary for an element to
14161395 * become available.
14171396 *
1418- * {@link ArrayBlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
1397+ * {@link BlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
14191398 */
1420- private ByteBuffer pollSendQueue (ArrayBlockingQueue <ByteBuffer > queue , long timeout , TimeUnit unit ) throws InterruptedException {
1421- return queue .poll (timeout , unit );
1399+ private ByteBuffer pollSendQueue (final BlockingQueue <ByteBuffer > queue ,
1400+ final long timeout , final TimeUnit unit ) throws InterruptedException {
1401+ return queue .poll (timeout , unit );
14221402 }
14231403
14241404 /**
14251405 * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
1426- * methods removes an element from the head of the Queue and then inserts
1427- * the element at the tail of the queue.
1428- *
1429- * This method is synchronized to achieve fairness between two threads that
1430- * are trying to insert an element in the queue. Each thread checks if the
1431- * queue is full, then removes the element at the head of the queue, and
1432- * then inserts an element at the tail. This three-step process is done to
1433- * prevent a thread from blocking while inserting an element in the queue.
1434- * If we do not synchronize the call to this method, then a thread can grab
1435- * a slot in the queue created by the second thread. This can cause the call
1436- * to insert by the second thread to fail.
1437- * Note that synchronizing this method does not block another thread
1438- * from polling the queue since that synchronization is provided by the
1439- * queue itself.
1406+ * methods removes an element from the head of the Queue and then inserts the
1407+ * element at the tail of the queue.
14401408 *
1441- * @param msg
1442- * Reference to the message to be inserted in the queue
1409+ * @param msg Reference to the message to be inserted in the queue
14431410 */
1444- public void addToRecvQueue (Message msg ) {
1445- synchronized (recvQLock ) {
1446- if (recvQueue .remainingCapacity () == 0 ) {
1447- try {
1448- recvQueue .remove ();
1449- } catch (NoSuchElementException ne ) {
1450- // element could be removed by poll()
1451- LOG .debug ("Trying to remove from an empty recvQueue. Ignoring exception." , ne );
1452- }
1453- }
1454- try {
1455- recvQueue .add (msg );
1456- } catch (IllegalStateException ie ) {
1457- // This should never happen
1458- LOG .error ("Unable to insert element in the recvQueue " , ie );
1459- }
1460- }
1411+ public void addToRecvQueue (final Message msg ) {
1412+ final boolean success = this .recvQueue .offer (msg );
1413+ if (!success ) {
1414+ throw new RuntimeException ("Could not insert into receive queue" );
1415+ }
14611416 }
14621417
14631418 /**
14641419 * Retrieves and removes a message at the head of this queue,
14651420 * waiting up to the specified wait time if necessary for an element to
14661421 * become available.
14671422 *
1468- * {@link ArrayBlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
1423+ * {@link BlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
14691424 */
1470- public Message pollRecvQueue (long timeout , TimeUnit unit ) throws InterruptedException {
1471- return recvQueue .poll (timeout , unit );
1425+ public Message pollRecvQueue (final long timeout , final TimeUnit unit )
1426+ throws InterruptedException {
1427+ return this .recvQueue .poll (timeout , unit );
14721428 }
14731429
14741430 public boolean connectedToPeer (long peerSid ) {
0 commit comments