Skip to content

Commit d887780

Browse files
committed
Introduce new class CircularBlockingQueue
1 parent 2793db2 commit d887780

2 files changed

Lines changed: 316 additions & 0 deletions

File tree

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.util;
20+
21+
import java.util.ArrayDeque;
22+
import java.util.Collection;
23+
import java.util.Iterator;
24+
import java.util.Objects;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.locks.Condition;
28+
import java.util.concurrent.locks.ReentrantLock;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* A bounded blocking queue backed by an array. This queue orders elements FIFO
35+
* (first-in-first-out). The head of the queue is that element that has been on
36+
* the queue the longest time. The tail of the queue is that element that has
37+
* been on the queue the shortest time. New elements are inserted at the tail of
38+
* the queue, and the queue retrieval operations obtain elements at the head of
39+
* the queue. If the queue is full, the head of the queue (the oldest element)
40+
* will be removed to make room for the newest element.
41+
*/
42+
public class CircularBlockingQueue<E> implements BlockingQueue<E> {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(CircularBlockingQueue.class);
45+
46+
/** Main lock guarding all access */
47+
private final ReentrantLock lock;
48+
49+
/** Condition for waiting takes */
50+
private final Condition notEmpty;
51+
52+
/** The array-backed queue */
53+
private final ArrayDeque<E> queue;
54+
55+
private final int maxSize;
56+
57+
public CircularBlockingQueue(int queueSize) {
58+
this.queue = new ArrayDeque<>(queueSize);
59+
this.maxSize = queueSize;
60+
61+
this.lock = new ReentrantLock();
62+
this.notEmpty = this.lock.newCondition();
63+
}
64+
65+
/**
66+
* This method differs from {@link BlockingQueue#offer(Object)} in that it
67+
* will remove the oldest queued element (the element at the front of the
68+
* queue) in order to make room for any new elements if the queue is full.
69+
*
70+
* @param e the element to add
71+
* @return true since it will make room for any new elements if required
72+
*/
73+
@Override
74+
public boolean offer(E e) {
75+
Objects.requireNonNull(e);
76+
final ReentrantLock lock = this.lock;
77+
lock.lock();
78+
try {
79+
if (this.queue.size() == this.maxSize) {
80+
final E discard = this.queue.remove();
81+
LOG.debug("Queue if full. Discarding oldest element: {}", discard);
82+
}
83+
this.queue.add(e);
84+
this.notEmpty.signal();
85+
} finally {
86+
lock.unlock();
87+
}
88+
return true;
89+
}
90+
91+
@Override
92+
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
93+
long nanos = unit.toNanos(timeout);
94+
final ReentrantLock lock = this.lock;
95+
lock.lockInterruptibly();
96+
try {
97+
while (this.queue.isEmpty()) {
98+
if (nanos <= 0) {
99+
return null;
100+
}
101+
nanos = this.notEmpty.awaitNanos(nanos);
102+
}
103+
return this.queue.poll();
104+
} finally {
105+
lock.unlock();
106+
}
107+
}
108+
109+
@Override
110+
public E take() throws InterruptedException {
111+
final ReentrantLock lock = this.lock;
112+
lock.lockInterruptibly();
113+
try {
114+
while (this.queue.isEmpty()) {
115+
this.notEmpty.await();
116+
}
117+
return this.queue.poll();
118+
} finally {
119+
lock.unlock();
120+
}
121+
}
122+
123+
@Override
124+
public boolean isEmpty() {
125+
final ReentrantLock lock = this.lock;
126+
lock.lock();
127+
try {
128+
return this.queue.isEmpty();
129+
} finally {
130+
lock.unlock();
131+
}
132+
}
133+
134+
@Override
135+
public int size() {
136+
final ReentrantLock lock = this.lock;
137+
lock.lock();
138+
try {
139+
return this.queue.size();
140+
} finally {
141+
lock.unlock();
142+
}
143+
}
144+
145+
@Override
146+
public int drainTo(Collection<? super E> c) {
147+
throw new UnsupportedOperationException();
148+
}
149+
150+
@Override
151+
public E poll() {
152+
throw new UnsupportedOperationException();
153+
}
154+
155+
@Override
156+
public E element() {
157+
throw new UnsupportedOperationException();
158+
}
159+
160+
@Override
161+
public E peek() {
162+
throw new UnsupportedOperationException();
163+
}
164+
165+
@Override
166+
public E remove() {
167+
throw new UnsupportedOperationException();
168+
}
169+
170+
@Override
171+
public boolean addAll(Collection<? extends E> arg0) {
172+
throw new UnsupportedOperationException();
173+
}
174+
175+
@Override
176+
public void clear() {
177+
throw new UnsupportedOperationException();
178+
}
179+
180+
@Override
181+
public boolean containsAll(Collection<?> arg0) {
182+
throw new UnsupportedOperationException();
183+
}
184+
185+
@Override
186+
public Iterator<E> iterator() {
187+
throw new UnsupportedOperationException();
188+
}
189+
190+
@Override
191+
public boolean removeAll(Collection<?> arg0) {
192+
throw new UnsupportedOperationException();
193+
}
194+
195+
@Override
196+
public boolean retainAll(Collection<?> arg0) {
197+
throw new UnsupportedOperationException();
198+
}
199+
200+
@Override
201+
public Object[] toArray() {
202+
throw new UnsupportedOperationException();
203+
}
204+
205+
@Override
206+
public <T> T[] toArray(T[] arg0) {
207+
throw new UnsupportedOperationException();
208+
}
209+
210+
@Override
211+
public boolean add(E e) {
212+
throw new UnsupportedOperationException();
213+
}
214+
215+
@Override
216+
public boolean contains(Object o) {
217+
throw new UnsupportedOperationException();
218+
}
219+
220+
@Override
221+
public int drainTo(Collection<? super E> c, int maxElements) {
222+
throw new UnsupportedOperationException();
223+
}
224+
225+
@Override
226+
public boolean offer(E e, long timeout, TimeUnit unit)
227+
throws InterruptedException {
228+
throw new UnsupportedOperationException();
229+
}
230+
231+
@Override
232+
public void put(E e) throws InterruptedException {
233+
throw new UnsupportedOperationException();
234+
}
235+
236+
@Override
237+
public int remainingCapacity() {
238+
throw new UnsupportedOperationException();
239+
}
240+
241+
@Override
242+
public boolean remove(Object o) {
243+
throw new UnsupportedOperationException();
244+
}
245+
246+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.server.util;
20+
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.Future;
26+
27+
import org.apache.zookeeper.util.CircularBlockingQueue;
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
31+
public class TestCircularBlockingQueue {
32+
33+
@Test
34+
public void testCircularBlockingQueue() throws InterruptedException {
35+
final BlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
36+
testQueue.offer(1);
37+
testQueue.offer(2);
38+
testQueue.offer(3);
39+
40+
Assert.assertEquals(2, testQueue.size());
41+
42+
Assert.assertEquals(2, testQueue.take().intValue());
43+
Assert.assertEquals(3, testQueue.take().intValue());
44+
45+
Assert.assertEquals(0, testQueue.size());
46+
Assert.assertEquals(true, testQueue.isEmpty());
47+
}
48+
49+
@Test(timeout = 10000L)
50+
public void testCircularBlockingQueueTakeBlock()
51+
throws InterruptedException, ExecutionException {
52+
53+
final BlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
54+
55+
ExecutorService executor = Executors.newSingleThreadExecutor();
56+
57+
Future<Integer> testTake = executor.submit(() -> {
58+
return testQueue.take();
59+
});
60+
61+
// Allow the other thread to get into position; waiting for item to be inserted
62+
Thread.sleep(2000L);
63+
64+
testQueue.offer(10);
65+
66+
Integer result = testTake.get();
67+
Assert.assertEquals(10, result.intValue());
68+
}
69+
70+
}

0 commit comments

Comments
 (0)