Skip to content

Commit 6aa8964

Browse files
jean-philippe-martinvam-google
authored andcommitted
More aggressive retry strategy for NIO (#2083)
by @jean-philippe-martin
1 parent 0c6b8cf commit 6aa8964

2 files changed

Lines changed: 62 additions & 11 deletions

File tree

google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageReadChannel.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,27 @@
1616

1717
package com.google.cloud.storage.contrib.nio;
1818

19-
import static com.google.common.base.Preconditions.checkArgument;
20-
2119
import com.google.cloud.ReadChannel;
2220
import com.google.cloud.storage.BlobId;
2321
import com.google.cloud.storage.BlobInfo;
2422
import com.google.cloud.storage.Storage;
2523
import com.google.cloud.storage.StorageException;
2624

25+
import javax.annotation.CheckReturnValue;
26+
import javax.annotation.concurrent.ThreadSafe;
2727
import java.io.IOException;
2828
import java.nio.ByteBuffer;
2929
import java.nio.channels.ClosedChannelException;
3030
import java.nio.channels.NonWritableChannelException;
3131
import java.nio.channels.SeekableByteChannel;
3232
import java.nio.file.NoSuchFileException;
3333

34-
import javax.annotation.CheckReturnValue;
35-
import javax.annotation.concurrent.ThreadSafe;
34+
import javax.net.ssl.SSLException;
35+
import java.io.EOFException;
36+
import java.net.SocketException;
37+
import java.net.SocketTimeoutException;
38+
39+
import static com.google.common.base.Preconditions.checkArgument;
3640

3741
/**
3842
* Cloud Storage read channel.
@@ -101,26 +105,39 @@ public int read(ByteBuffer dst) throws IOException {
101105
checkOpen();
102106
int amt;
103107
int retries = 0;
104-
int maxRetries = 3;
108+
int maxRetries = Math.max(3, maxChannelReopens);
105109
dst.mark();
106110
while (true) {
107111
try {
108112
dst.reset();
109113
amt = channel.read(dst);
110114
break;
111115
} catch (StorageException exs) {
112-
if (exs.getMessage().contains("Connection closed prematurely") && reopens < maxChannelReopens) {
113-
// this error isn't marked as retryable since the channel is closed;
114-
// but here at this higher level we can retry it.
116+
if (isReopenable(exs)) {
117+
// these errors aren't marked as retryable since the channel is closed;
118+
// but here at this higher level we can retry them.
115119
reopens++;
120+
if (reopens > maxChannelReopens) {
121+
throw new StorageException(exs.getCode(), "All reopens failed", exs);
122+
}
116123
sleepForAttempt(reopens);
117124
innerOpen();
118125
continue;
119-
} else if ((exs.getCode() == 500 || exs.getCode() == 503) && retries < maxRetries) {
126+
} else if (exs.isRetryable() || exs.getCode() == 500 || exs.getCode() == 503) {
120127
retries++;
128+
if (retries > maxRetries) {
129+
// this exception will be marked as retriable in most cases since
130+
// it's based on the code. It may be confusing to see a retriable error
131+
// that says "all retries failed" but understand this to mean:
132+
// "While in principle you should be able to retry, we already did that
133+
// for you a few times and it still didn't work so we wouldn't recommend
134+
// further retries."
135+
throw new StorageException(exs.getCode(), "All retries failed", exs);
136+
}
121137
sleepForAttempt(retries);
122138
continue;
123139
}
140+
// exception is neither reopenable nor retryable
124141
throw exs;
125142
}
126143
}
@@ -135,9 +152,30 @@ public int read(ByteBuffer dst) throws IOException {
135152
}
136153
}
137154

155+
private static boolean isReopenable(Throwable exs) {
156+
Throwable throwable = exs;
157+
// ensures finite iteration
158+
int maxDepth = 10;
159+
while (throwable != null && maxDepth-- > 0) {
160+
if ((throwable.getMessage() != null
161+
&& throwable.getMessage().contains("Connection closed prematurely"))
162+
|| throwable instanceof SSLException
163+
|| throwable instanceof EOFException
164+
|| throwable instanceof SocketException
165+
|| throwable instanceof SocketTimeoutException) {
166+
return true;
167+
}
168+
throwable = throwable.getCause();
169+
}
170+
return false;
171+
}
172+
138173
private void sleepForAttempt(int attempt) {
174+
// exponential backoff, but let's bound it around 2min.
175+
// aggressive backoff because we're dealing with unusual cases.
176+
long delay = 1000L * (1L << Math.min(attempt, 7));
139177
try {
140-
Thread.sleep((attempt - 1) * 500);
178+
Thread.sleep(delay);
141179
} catch (InterruptedException iex) {
142180
// reset interrupt flag
143181
Thread.currentThread().interrupt();

google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/CloudStorageReadChannelTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.junit.runner.RunWith;
3939
import org.junit.runners.JUnit4;
4040

41+
import javax.net.ssl.SSLHandshakeException;
4142
import java.io.IOException;
4243
import java.nio.ByteBuffer;
4344
import java.nio.channels.ClosedChannelException;
@@ -84,7 +85,19 @@ public void testRead() throws IOException {
8485
public void testReadRetry() throws IOException {
8586
ByteBuffer buffer = ByteBuffer.allocate(1);
8687
when(gcsChannel.read(eq(buffer)))
87-
.thenThrow(new StorageException(new IOException("Connection closed prematurely: bytesRead = 33554432, Content-Length = 41943040")))
88+
.thenThrow(new StorageException(new IOException("outer", new IOException("Connection closed prematurely: bytesRead = 33554432, Content-Length = 41943040"))))
89+
.thenReturn(1);
90+
assertThat(chan.position()).isEqualTo(0L);
91+
assertThat(chan.read(buffer)).isEqualTo(1);
92+
assertThat(chan.position()).isEqualTo(1L);
93+
verify(gcsChannel, times(2)).read(any(ByteBuffer.class));
94+
}
95+
96+
@Test
97+
public void testReadRetrySSLHandshake() throws IOException {
98+
ByteBuffer buffer = ByteBuffer.allocate(1);
99+
when(gcsChannel.read(eq(buffer)))
100+
.thenThrow(new StorageException(new IOException("something", new IOException("thing", new SSLHandshakeException("connection closed due to throttling")))))
88101
.thenReturn(1);
89102
assertThat(chan.position()).isEqualTo(0L);
90103
assertThat(chan.read(buffer)).isEqualTo(1);

0 commit comments

Comments
 (0)