Skip to content

Commit c63ab85

Browse files
committed
Added updates from code review
1 parent d887780 commit c63ab85

2 files changed

Lines changed: 42 additions & 7 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/util/CircularBlockingQueue.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,15 @@ public class CircularBlockingQueue<E> implements BlockingQueue<E> {
5454

5555
private final int maxSize;
5656

57+
private long droppedCount;
58+
5759
public CircularBlockingQueue(int queueSize) {
5860
this.queue = new ArrayDeque<>(queueSize);
5961
this.maxSize = queueSize;
6062

6163
this.lock = new ReentrantLock();
6264
this.notEmpty = this.lock.newCondition();
65+
this.droppedCount = 0L;
6366
}
6467

6568
/**
@@ -78,7 +81,9 @@ public boolean offer(E e) {
7881
try {
7982
if (this.queue.size() == this.maxSize) {
8083
final E discard = this.queue.remove();
81-
LOG.debug("Queue if full. Discarding oldest element: {}", discard);
84+
this.droppedCount++;
85+
LOG.debug("Queue is full. Discarding oldest element [count={}]: {}",
86+
this.droppedCount, discard);
8287
}
8388
this.queue.add(e);
8489
this.notEmpty.signal();
@@ -142,11 +147,38 @@ public int size() {
142147
}
143148
}
144149

150+
/**
151+
* Returns the number of elements that were dropped from the queue because the
152+
* queue was full when a new element was offered.
153+
*
154+
* @return The number of elements dropped (lost) from the queue
155+
*/
156+
public long getDroppedCount() {
157+
return this.droppedCount;
158+
}
159+
160+
/**
161+
* For testing purposes only.
162+
*
163+
* @return True if a thread is blocked waiting for a new element to be offered
164+
* to the queue
165+
*/
166+
boolean isConsumerThreadBlocked() {
167+
final ReentrantLock lock = this.lock;
168+
lock.lock();
169+
try {
170+
return lock.getWaitQueueLength(this.notEmpty) > 0;
171+
} finally {
172+
lock.unlock();
173+
}
174+
}
175+
145176
@Override
146177
public int drainTo(Collection<? super E> c) {
147178
throw new UnsupportedOperationException();
148179
}
149180

181+
150182
@Override
151183
public E poll() {
152184
throw new UnsupportedOperationException();

zookeeper-server/src/test/java/org/apache/zookeeper/server/util/TestCircularBlockingQueue.java renamed to zookeeper-server/src/test/java/org/apache/zookeeper/util/TestCircularBlockingQueue.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.zookeeper.server.util;
19+
package org.apache.zookeeper.util;
2020

21-
import java.util.concurrent.BlockingQueue;
2221
import java.util.concurrent.ExecutionException;
2322
import java.util.concurrent.ExecutorService;
2423
import java.util.concurrent.Executors;
2524
import java.util.concurrent.Future;
2625

27-
import org.apache.zookeeper.util.CircularBlockingQueue;
2826
import org.junit.Assert;
2927
import org.junit.Test;
3028

3129
public class TestCircularBlockingQueue {
3230

3331
@Test
3432
public void testCircularBlockingQueue() throws InterruptedException {
35-
final BlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
33+
final CircularBlockingQueue<Integer> testQueue =
34+
new CircularBlockingQueue<>(2);
35+
3636
testQueue.offer(1);
3737
testQueue.offer(2);
3838
testQueue.offer(3);
@@ -42,6 +42,7 @@ public void testCircularBlockingQueue() throws InterruptedException {
4242
Assert.assertEquals(2, testQueue.take().intValue());
4343
Assert.assertEquals(3, testQueue.take().intValue());
4444

45+
Assert.assertEquals(1L, testQueue.getDroppedCount());
4546
Assert.assertEquals(0, testQueue.size());
4647
Assert.assertEquals(true, testQueue.isEmpty());
4748
}
@@ -50,7 +51,7 @@ public void testCircularBlockingQueue() throws InterruptedException {
5051
public void testCircularBlockingQueueTakeBlock()
5152
throws InterruptedException, ExecutionException {
5253

53-
final BlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
54+
final CircularBlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
5455

5556
ExecutorService executor = Executors.newSingleThreadExecutor();
5657

@@ -59,7 +60,9 @@ public void testCircularBlockingQueueTakeBlock()
5960
});
6061

6162
// Allow the other thread to get into position; waiting for item to be inserted
62-
Thread.sleep(2000L);
63+
while (!testQueue.isConsumerThreadBlocked()) {
64+
Thread.sleep(50L);
65+
}
6366

6467
testQueue.offer(10);
6568

0 commit comments

Comments
 (0)