3636import java .util .Enumeration ;
3737import java .util .HashSet ;
3838import java .util .Map ;
39- import java .util .NoSuchElementException ;
4039import java .util .Set ;
4140import java .util .concurrent .ArrayBlockingQueue ;
41+ import java .util .concurrent .BlockingQueue ;
4242import java .util .concurrent .ConcurrentHashMap ;
4343import java .util .concurrent .SynchronousQueue ;
4444import java .util .concurrent .ThreadFactory ;
@@ -143,11 +143,7 @@ public class QuorumCnxManager {
143143 /*
144144 * Reception queue
145145 */
146- public final ArrayBlockingQueue <Message > recvQueue ;
147- /*
148- * Object to synchronize access to recvQueue
149- */
150- private final Object recvQLock = new Object ();
146+ public final BlockingQueue <Message > recvQueue ;
151147
152148 /*
153149 * Shutdown flag
@@ -438,7 +434,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
438434 }
439435
440436 senderWorkerMap .put (sid , sw );
441- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <ByteBuffer >(SEND_CAPACITY ));
437+
438+ queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
442439
443440 sw .start ();
444441 rw .start ();
@@ -573,7 +570,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
573570
574571 senderWorkerMap .put (sid , sw );
575572
576- queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <ByteBuffer >(SEND_CAPACITY ));
573+ queueSendMap .putIfAbsent (sid , new ArrayBlockingQueue <>(SEND_CAPACITY ));
577574
578575 sw .start ();
579576 rw .start ();
@@ -601,7 +598,6 @@ public void toSend(Long sid, ByteBuffer b) {
601598 ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .computeIfAbsent (sid , serverId -> new ArrayBlockingQueue <>(SEND_CAPACITY ));
602599 addToSendQueue (bq , b );
603600 connectOne (sid );
604-
605601 }
606602 }
607603
@@ -1085,7 +1081,7 @@ public void run() {
10851081 * message than that stored in lastMessage. To avoid sending
10861082 * stale message, we should send the message in the send queue.
10871083 */
1088- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
1084+ BlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
10891085 if (bq == null || isSendQueueEmpty (bq )) {
10901086 ByteBuffer b = lastMessageSent .get (sid );
10911087 if (b != null ) {
@@ -1103,7 +1099,7 @@ public void run() {
11031099
11041100 ByteBuffer b = null ;
11051101 try {
1106- ArrayBlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
1102+ BlockingQueue <ByteBuffer > bq = queueSendMap .get (sid );
11071103 if (bq != null ) {
11081104 b = pollSendQueue (bq , 1000 , TimeUnit .MILLISECONDS );
11091105 } else {
@@ -1233,20 +1229,12 @@ public void run() {
12331229 * @param buffer
12341230 * Reference to the buffer to be inserted in the queue
12351231 */
1236- private void addToSendQueue (ArrayBlockingQueue <ByteBuffer > queue , ByteBuffer buffer ) {
1237- if (queue .remainingCapacity () == 0 ) {
1238- try {
1239- queue .remove ();
1240- } catch (NoSuchElementException ne ) {
1241- // element could be removed by poll()
1242- LOG .debug ("Trying to remove from an empty Queue. Ignoring exception." , ne );
1243- }
1244- }
1245- try {
1246- queue .add (buffer );
1247- } catch (IllegalStateException ie ) {
1248- // This should never happen
1249- LOG .error ("Unable to insert an element in the queue " , ie );
1232+ private void addToSendQueue (final BlockingQueue <ByteBuffer > queue ,
1233+ final ByteBuffer buffer ) {
1234+ final boolean success = queue .offer (buffer );
1235+ if (!success ) {
1236+ queue .poll ();
1237+ queue .offer (buffer );
12501238 }
12511239 }
12521240
@@ -1257,7 +1245,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
12571245 * @return
12581246 * true if the specified queue is empty
12591247 */
1260- private boolean isSendQueueEmpty (ArrayBlockingQueue <ByteBuffer > queue ) {
1248+ private boolean isSendQueueEmpty (BlockingQueue <ByteBuffer > queue ) {
12611249 return queue .isEmpty ();
12621250 }
12631251
@@ -1266,10 +1254,11 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
12661254 * waiting up to the specified wait time if necessary for an element to
12671255 * become available.
12681256 *
1269- * {@link ArrayBlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
1257+ * {@link BlockingQueue #poll(long, java.util.concurrent.TimeUnit)}
12701258 */
1271- private ByteBuffer pollSendQueue (ArrayBlockingQueue <ByteBuffer > queue , long timeout , TimeUnit unit ) throws InterruptedException {
1272- return queue .poll (timeout , unit );
1259+ private ByteBuffer pollSendQueue (final BlockingQueue <ByteBuffer > queue ,
1260+ final long timeout , final TimeUnit unit ) throws InterruptedException {
1261+ return queue .poll (timeout , unit );
12731262 }
12741263
12751264 /**
@@ -1292,21 +1281,12 @@ private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long time
12921281 * @param msg
12931282 * Reference to the message to be inserted in the queue
12941283 */
1295- public void addToRecvQueue (Message msg ) {
1296- synchronized (recvQLock ) {
1297- if (recvQueue .remainingCapacity () == 0 ) {
1298- try {
1299- recvQueue .remove ();
1300- } catch (NoSuchElementException ne ) {
1301- // element could be removed by poll()
1302- LOG .debug ("Trying to remove from an empty recvQueue. Ignoring exception." , ne );
1303- }
1304- }
1305- try {
1306- recvQueue .add (msg );
1307- } catch (IllegalStateException ie ) {
1308- // This should never happen
1309- LOG .error ("Unable to insert element in the recvQueue " , ie );
1284+ public void addToRecvQueue (final Message msg ) {
1285+ synchronized (this .recvQueue ) {
1286+ final boolean success = this .recvQueue .offer (msg );
1287+ if (!success ) {
1288+ this .recvQueue .poll ();
1289+ this .recvQueue .offer (msg );
13101290 }
13111291 }
13121292 }
@@ -1318,8 +1298,9 @@ public void addToRecvQueue(Message msg) {
13181298 *
13191299 * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
13201300 */
1321- public Message pollRecvQueue (long timeout , TimeUnit unit ) throws InterruptedException {
1322- return recvQueue .poll (timeout , unit );
1301+ public Message pollRecvQueue (final long timeout , final TimeUnit unit )
1302+ throws InterruptedException {
1303+ return this .recvQueue .poll (timeout , unit );
13231304 }
13241305
13251306 public boolean connectedToPeer (long peerSid ) {
0 commit comments