Skip to content

Commit d22d4cf

Browse files
Channel closing code
1 parent dc57262 commit d22d4cf

2 files changed

Lines changed: 35 additions & 14 deletions

File tree

google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.google.cloud.storage.contrib.nio;
1818

19-
import com.google.common.base.Stopwatch;
2019
import com.google.common.util.concurrent.Futures;
2120

2221
import java.io.Closeable;
@@ -65,7 +64,7 @@ public class SeekableByteChannelPrefetcher implements SeekableByteChannel {
6564
// size of the underlying channel(s).
6665
private final long size;
6766
// where we pretend to be, wrt returning bytes from read()
68-
private long position = 0;
67+
private long position;
6968
// whether we're open.
7069
private boolean open = true;
7170
private boolean closing = false;
@@ -169,6 +168,7 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts,
169168
idleWorkers.add(new Worker(bc));
170169
}
171170
size = chan.size();
171+
position = 0;
172172
}
173173

174174
/**
@@ -178,7 +178,9 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts,
178178
*/
179179
@Override
180180
public int read(ByteBuffer dst) throws IOException {
181-
if (!open) throw new ClosedChannelException();
181+
if (!open) {
182+
throw new ClosedChannelException();
183+
}
182184
ByteBuffer src;
183185
try {
184186
src = fetch(position);
@@ -229,7 +231,9 @@ public int write(ByteBuffer src) throws IOException {
229231
*/
230232
@Override
231233
public long position() throws IOException {
232-
if (!open) throw new ClosedChannelException();
234+
if (!open) {
235+
throw new ClosedChannelException();
236+
}
233237
return position;
234238
}
235239

@@ -258,7 +262,9 @@ public long position() throws IOException {
258262
*/
259263
@Override
260264
public SeekableByteChannel position(long newPosition) throws IOException {
261-
if (!open) throw new ClosedChannelException();
265+
if (!open) {
266+
throw new ClosedChannelException();
267+
}
262268
position = newPosition;
263269
return this;
264270
}
@@ -272,7 +278,9 @@ public SeekableByteChannel position(long newPosition) throws IOException {
272278
*/
273279
@Override
274280
public long size() throws IOException {
275-
if (!open) throw new ClosedChannelException();
281+
if (!open) {
282+
throw new ClosedChannelException();
283+
}
276284
return size;
277285
}
278286

@@ -314,7 +322,15 @@ public boolean isOpen() {
314322
@Override
315323
public void close() throws IOException {
316324
if (open) {
317-
// TODO: quiet everything, close all channels.
325+
closing = true;
326+
while (true) {
327+
synchronized (idleWorkers) {
328+
if (idleWorkers.size() == prefetchingThreads + extraThreads) {
329+
// every thread is idle, we're done.
330+
break;
331+
}
332+
}
333+
}
318334
exec.shutdown();
319335
try {
320336
exec.awaitTermination(60, TimeUnit.SECONDS);
@@ -345,10 +361,10 @@ public ByteBuffer fetch(long position) throws InterruptedException, ExecutionExc
345361
private Worker getIdleWorker() throws InterruptedException {
346362
while (true) {
347363
synchronized (idleWorkers) {
348-
while (idleWorkers.size() == 0) {
364+
while (idleWorkers.isEmpty()) {
349365
idleWorkers.wait();
350366
}
351-
if (idleWorkers.size() > 0) {
367+
if (!idleWorkers.isEmpty()) {
352368
return idleWorkers.remove(0);
353369
}
354370
}
@@ -358,7 +374,10 @@ private Worker getIdleWorker() throws InterruptedException {
358374
private Worker tryGetIdleWorker() throws InterruptedException {
359375
while (true) {
360376
synchronized (idleWorkers) {
361-
if (idleWorkers.size() > 0) {
377+
if (closing) {
378+
return null;
379+
}
380+
if (!idleWorkers.isEmpty()) {
362381
return idleWorkers.remove(0);
363382
}
364383
return null;
@@ -367,11 +386,11 @@ private Worker tryGetIdleWorker() throws InterruptedException {
367386
}
368387

369388
private void sicWorker(Worker worker, long pos, Buffer toFill) throws ExecutionException, InterruptedException {
370-
pos = beginningOfBucket(pos);
371-
worker.init(pos, toFill);
389+
long bucketStart = beginningOfBucket(pos);
390+
worker.init(bucketStart, toFill);
372391
Future<ByteBuffer> promise = exec.submit(worker);
373392
toFill.promise = promise;
374-
buffers.put(index(pos), toFill);
393+
buffers.put(index(bucketStart), toFill);
375394
}
376395

377396
private long index(long pos) {
@@ -410,7 +429,7 @@ private void reassignWorker(Worker w) throws ExecutionException, InterruptedExce
410429
// nothing to do, return to idle pool
411430
synchronized (idleWorkers) {
412431
idleWorkers.add(w);
413-
idleWorkers.notify();
432+
idleWorkers.notifyAll();
414433
}
415434
}
416435

google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ private void innerTestReadByteChannel(boolean prefetch) throws IOException {
177177
byte[] expected = new byte[SML_SIZE];
178178
new Random(SML_SIZE).nextBytes(expected);
179179
assertThat(Arrays.equals(buf.array(), expected)).isTrue();
180+
chan.close();
180181
}
181182

182183
@Test
@@ -220,6 +221,7 @@ private void innerTestSeek(boolean prefetch) throws IOException {
220221
// if the two spots in the file have the same contents, then this isn't a good file for this
221222
// test.
222223
assertThat(wanted).isNotEqualTo(wanted2);
224+
chan.close();
223225
}
224226

225227
@Test

0 commit comments

Comments
 (0)