Skip to content

Commit ce96b70

Browse files
committed
ZOOKEEPER-3340: Improve Queue Usage in QuorumCnxManager.java
1 parent c9eeeda commit ce96b70

1 file changed

Lines changed: 27 additions & 46 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import java.util.Enumeration;
3737
import java.util.HashSet;
3838
import java.util.Map;
39-
import java.util.NoSuchElementException;
4039
import java.util.Set;
4140
import java.util.concurrent.ArrayBlockingQueue;
41+
import java.util.concurrent.BlockingQueue;
4242
import java.util.concurrent.ConcurrentHashMap;
4343
import java.util.concurrent.SynchronousQueue;
4444
import java.util.concurrent.ThreadFactory;
@@ -143,11 +143,7 @@ public class QuorumCnxManager {
143143
/*
144144
* Reception queue
145145
*/
146-
public final ArrayBlockingQueue<Message> recvQueue;
147-
/*
148-
* Object to synchronize access to recvQueue
149-
*/
150-
private final Object recvQLock = new Object();
146+
public final BlockingQueue<Message> recvQueue;
151147

152148
/*
153149
* Shutdown flag
@@ -438,7 +434,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
438434
}
439435

440436
senderWorkerMap.put(sid, sw);
441-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
437+
438+
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
442439

443440
sw.start();
444441
rw.start();
@@ -573,7 +570,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti
573570

574571
senderWorkerMap.put(sid, sw);
575572

576-
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
573+
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<>(SEND_CAPACITY));
577574

578575
sw.start();
579576
rw.start();
@@ -601,7 +598,6 @@ public void toSend(Long sid, ByteBuffer b) {
601598
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
602599
addToSendQueue(bq, b);
603600
connectOne(sid);
604-
605601
}
606602
}
607603

@@ -1085,7 +1081,7 @@ public void run() {
10851081
* message than that stored in lastMessage. To avoid sending
10861082
* stale message, we should send the message in the send queue.
10871083
*/
1088-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
1084+
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
10891085
if (bq == null || isSendQueueEmpty(bq)) {
10901086
ByteBuffer b = lastMessageSent.get(sid);
10911087
if (b != null) {
@@ -1103,7 +1099,7 @@ public void run() {
11031099

11041100
ByteBuffer b = null;
11051101
try {
1106-
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
1102+
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
11071103
if (bq != null) {
11081104
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
11091105
} else {
@@ -1233,20 +1229,12 @@ public void run() {
12331229
* @param buffer
12341230
* Reference to the buffer to be inserted in the queue
12351231
*/
1236-
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
1237-
if (queue.remainingCapacity() == 0) {
1238-
try {
1239-
queue.remove();
1240-
} catch (NoSuchElementException ne) {
1241-
// element could be removed by poll()
1242-
LOG.debug("Trying to remove from an empty Queue. Ignoring exception.", ne);
1243-
}
1244-
}
1245-
try {
1246-
queue.add(buffer);
1247-
} catch (IllegalStateException ie) {
1248-
// This should never happen
1249-
LOG.error("Unable to insert an element in the queue ", ie);
1232+
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
1233+
final ByteBuffer buffer) {
1234+
final boolean success = queue.offer(buffer);
1235+
if (!success) {
1236+
queue.poll();
1237+
queue.offer(buffer);
12501238
}
12511239
}
12521240

@@ -1257,7 +1245,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
12571245
* @return
12581246
* true if the specified queue is empty
12591247
*/
1260-
private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
1248+
private boolean isSendQueueEmpty(BlockingQueue<ByteBuffer> queue) {
12611249
return queue.isEmpty();
12621250
}
12631251

@@ -1266,10 +1254,11 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
12661254
* waiting up to the specified wait time if necessary for an element to
12671255
* become available.
12681256
*
1269-
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
1257+
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
12701258
*/
1271-
private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
1272-
return queue.poll(timeout, unit);
1259+
private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
1260+
final long timeout, final TimeUnit unit) throws InterruptedException {
1261+
return queue.poll(timeout, unit);
12731262
}
12741263

12751264
/**
@@ -1292,21 +1281,12 @@ private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long time
12921281
* @param msg
12931282
* Reference to the message to be inserted in the queue
12941283
*/
1295-
public void addToRecvQueue(Message msg) {
1296-
synchronized (recvQLock) {
1297-
if (recvQueue.remainingCapacity() == 0) {
1298-
try {
1299-
recvQueue.remove();
1300-
} catch (NoSuchElementException ne) {
1301-
// element could be removed by poll()
1302-
LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception.", ne);
1303-
}
1304-
}
1305-
try {
1306-
recvQueue.add(msg);
1307-
} catch (IllegalStateException ie) {
1308-
// This should never happen
1309-
LOG.error("Unable to insert element in the recvQueue ", ie);
1284+
public void addToRecvQueue(final Message msg) {
1285+
synchronized (this.recvQueue) {
1286+
final boolean success = this.recvQueue.offer(msg);
1287+
if (!success) {
1288+
this.recvQueue.poll();
1289+
this.recvQueue.offer(msg);
13101290
}
13111291
}
13121292
}
@@ -1318,8 +1298,9 @@ public void addToRecvQueue(Message msg) {
13181298
*
13191299
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
13201300
*/
1321-
public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
1322-
return recvQueue.poll(timeout, unit);
1301+
public Message pollRecvQueue(final long timeout, final TimeUnit unit)
1302+
throws InterruptedException {
1303+
return this.recvQueue.poll(timeout, unit);
13231304
}
13241305

13251306
public boolean connectedToPeer(long peerSid) {

0 commit comments

Comments
 (0)