1919package org .apache .zookeeper .server .quorum ;
2020
2121import static org .apache .zookeeper .common .NetUtils .formatInetAddr ;
22+
2223import java .io .BufferedInputStream ;
2324import java .io .BufferedOutputStream ;
2425import java .io .DataInputStream ;
4647import java .util .concurrent .TimeUnit ;
4748import java .util .concurrent .atomic .AtomicInteger ;
4849import java .util .concurrent .atomic .AtomicLong ;
50+
4951import javax .net .ssl .SSLSocket ;
52+
5053import org .apache .zookeeper .common .X509Exception ;
5154import org .apache .zookeeper .server .ExitCode ;
5255import org .apache .zookeeper .server .ZooKeeperThread ;
5558import org .apache .zookeeper .server .quorum .auth .QuorumAuthServer ;
5659import org .apache .zookeeper .server .quorum .flexible .QuorumVerifier ;
5760import org .apache .zookeeper .server .util .ConfigUtils ;
61+ import org .apache .zookeeper .util .CircularBlockingQueue ;
5862import org .slf4j .Logger ;
5963import org .slf4j .LoggerFactory ;
6064
@@ -137,7 +141,7 @@ public class QuorumCnxManager {
137141 * Mapping from Peer to Thread number
138142 */
139143 final ConcurrentHashMap <Long , SendWorker > senderWorkerMap ;
140- final ConcurrentHashMap <Long , ArrayBlockingQueue <ByteBuffer >> queueSendMap ;
144+ final ConcurrentHashMap <Long , BlockingQueue <ByteBuffer >> queueSendMap ;
141145 final ConcurrentHashMap <Long , ByteBuffer > lastMessageSent ;
142146
143147 /*
@@ -249,10 +253,10 @@ public static InitialMessage parse(Long protocolVersion, DataInputStream din) th
249253 }
250254
251255 public QuorumCnxManager (QuorumPeer self , final long mySid , Map <Long , QuorumPeer .QuorumServer > view , QuorumAuthServer authServer , QuorumAuthLearner authLearner , int socketTimeout , boolean listenOnAllIPs , int quorumCnxnThreadsSize , boolean quorumSaslAuthEnabled ) {
252- this .recvQueue = new ArrayBlockingQueue < Message >(RECV_CAPACITY );
253- this .queueSendMap = new ConcurrentHashMap <Long , ArrayBlockingQueue < ByteBuffer > >();
254- this .senderWorkerMap = new ConcurrentHashMap <Long , SendWorker >();
255- this .lastMessageSent = new ConcurrentHashMap <Long , ByteBuffer >();
256+ this .recvQueue = new CircularBlockingQueue < >(RECV_CAPACITY );
257+ this .queueSendMap = new ConcurrentHashMap <>();
258+ this .senderWorkerMap = new ConcurrentHashMap <>();
259+ this .lastMessageSent = new ConcurrentHashMap <>();
256260
257261 String cnxToValue = System .getProperty ("zookeeper.cnxTimeout" );
258262 if (cnxToValue != null ) {
@@ -435,7 +439,7 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
435439
436440 senderWorkerMap .put (sid , sw );
437441
438- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
442+ queueSendMap .putIfAbsent (sid , new CircularBlockingQueue <>(SEND_CAPACITY ));
439443
440444 sw .start ();
441445 rw .start ();
@@ -570,7 +574,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
570574
571575 senderWorkerMap .put (sid , sw );
572576
573- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
577+ queueSendMap .putIfAbsent (sid , new CircularBlockingQueue <>(SEND_CAPACITY ));
574578
575579 sw .start ();
576580 rw .start ();
@@ -595,7 +599,7 @@ public void toSend(Long sid, ByteBuffer b) {
595599 /*
596600 * Start a new connection if doesn't have one already.
597601 */
598- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .computeIfAbsent (sid , serverId -> new ArrayBlockingQueue <>(SEND_CAPACITY ));
602+ BlockingQueue <ByteBuffer > bq = queueSendMap .computeIfAbsent (sid , serverId -> new CircularBlockingQueue <>(SEND_CAPACITY ));
599603 addToSendQueue (bq , b );
600604 connectOne (sid );
601605 }
@@ -720,9 +724,10 @@ public void connectAll() {
720724 * Check if all queues are empty, indicating that all messages have been delivered.
721725 */
722726 boolean haveDelivered () {
723- for (ArrayBlockingQueue <ByteBuffer > queue : queueSendMap .values ()) {
724- LOG .debug ("Queue size: {}" , queue .size ());
725- if (queue .size () == 0 ) {
727+ for (BlockingQueue <ByteBuffer > queue : queueSendMap .values ()) {
728+ final int queueSize = queue .size ();
729+ LOG .debug ("Queue size: {}" , queueSize );
730+ if (queueSize == 0 ) {
726731 return true ;
727732 }
728733 }
@@ -1212,30 +1217,19 @@ public void run() {
12121217 }
12131218
12141219 /**
1215- * Inserts an element in the specified queue. If the Queue is full, this
1216- * method removes an element from the head of the Queue and then inserts
1217- * the element at the tail. It can happen that an element is removed
1218- * by another thread in {@link SendWorker#run() }
1219- * method before this method attempts to remove an element from the queue.
1220- * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
1221- * exception, which is safe to ignore.
1222- *
1223- * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
1224- * not need to be synchronized since there is only one thread that inserts
1225- * an element in the queue and another thread that reads from the queue.
1220+ * Inserts an element in the provided {@link BlockingQueue}. This method
1221+ * assumes that if the Queue is full, an element from the head of the Queue is
1222+ * removed and the new item is inserted at the tail of the queue. This is done
1223+ * to prevent a thread from blocking while inserting an element in the queue.
12261224 *
1227- * @param queue
1228- * Reference to the Queue
1229- * @param buffer
1230- * Reference to the buffer to be inserted in the queue
1225+ * @param queue Reference to the Queue
1226+ * @param buffer Reference to the buffer to be inserted in the queue
12311227 */
12321228 private void addToSendQueue (final BlockingQueue <ByteBuffer > queue ,
12331229 final ByteBuffer buffer ) {
12341230 final boolean success = queue .offer (buffer );
12351231 if (!success ) {
1236- LOG .debug ("Could not insert buffer into queue. Discarding one." );
1237- queue .poll ();
1238- queue .offer (buffer );
1232+ throw new RuntimeException ("Could not insert into receive queue" );
12391233 }
12401234 }
12411235
@@ -1264,33 +1258,16 @@ private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
12641258
12651259 /**
12661260 * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
1267- * methods removes an element from the head of the Queue and then inserts
1268- * the element at the tail of the queue.
1269- *
1270- * This method is synchronized to achieve fairness between two threads that
1271- * are trying to insert an element in the queue. Each thread checks if the
1272- * queue is full, then removes the element at the head of the queue, and
1273- * then inserts an element at the tail. This three-step process is done to
1274- * prevent a thread from blocking while inserting an element in the queue.
1275- * If we do not synchronize the call to this method, then a thread can grab
1276- * a slot in the queue created by the second thread. This can cause the call
1277- * to insert by the second thread to fail.
1278- * Note that synchronizing this method does not block another thread
1279- * from polling the queue since that synchronization is provided by the
1280- * queue itself.
1261+ * methods removes an element from the head of the Queue and then inserts the
1262+ * element at the tail of the queue.
12811263 *
1282- * @param msg
1283- * Reference to the message to be inserted in the queue
1264+ * @param msg Reference to the message to be inserted in the queue
12841265 */
12851266 public void addToRecvQueue (final Message msg ) {
1286- synchronized (this .recvQueue ) {
1287- final boolean success = this .recvQueue .offer (msg );
1288- if (!success ) {
1289- LOG .debug ("Could not insert buffer into recv queue. Discarding one." );
1290- this .recvQueue .poll ();
1291- this .recvQueue .offer (msg );
1292- }
1293- }
1267+ final boolean success = this .recvQueue .offer (msg );
1268+ if (!success ) {
1269+ throw new RuntimeException ("Could not insert into receive queue" );
1270+ }
12941271 }
12951272
12961273 /**
0 commit comments