Skip to content

Commit 1ec21aa

Browse files
Encapsulate interrupts suppression in retry policy
1 parent 4cce458 commit 1ec21aa

8 files changed

Lines changed: 131 additions & 46 deletions

File tree

communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.communication.http;
22

3+
import java.io.IOException;
4+
import java.io.InterruptedIOException;
5+
import java.net.ConnectException;
36
import java.util.concurrent.ThreadLocalRandom;
47
import java.util.concurrent.TimeUnit;
58
import javax.annotation.Nullable;
@@ -34,7 +37,7 @@
3437
* instance.
3538
*/
3639
@NotThreadSafe
37-
public class HttpRetryPolicy {
40+
public class HttpRetryPolicy implements AutoCloseable {
3841

3942
private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);
4043

@@ -47,12 +50,35 @@ public class HttpRetryPolicy {
4750

4851
private int retriesLeft;
4952
private long delay;
53+
private boolean interrupted;
5054
private final double delayFactor;
55+
private final boolean suppressInterrupts;
5156

52-
private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
57+
private HttpRetryPolicy(
58+
int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
5359
this.retriesLeft = retriesLeft;
5460
this.delay = delay;
5561
this.delayFactor = delayFactor;
62+
this.suppressInterrupts = suppressInterrupts;
63+
}
64+
65+
public boolean shouldRetry(Exception e) {
66+
if (e instanceof ConnectException) {
67+
return shouldRetry((okhttp3.Response) null);
68+
}
69+
if (e instanceof InterruptedIOException) {
70+
if (suppressInterrupts) {
71+
return shouldRetry((okhttp3.Response) null);
72+
}
73+
}
74+
if (e instanceof InterruptedException) {
75+
if (suppressInterrupts) {
76+
// remember interrupted status to restore the thread's interrupted flag later
77+
interrupted = true;
78+
return shouldRetry((okhttp3.Response) null);
79+
}
80+
}
81+
return false;
5682
}
5783

5884
public boolean shouldRetry(@Nullable okhttp3.Response response) {
@@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
106132
}
107133
}
108134

109-
public long backoff() {
135+
long getBackoffDelay() {
110136
long currentDelay = delay;
111137
delay = (long) (delay * delayFactor);
112138
return currentDelay;
113139
}
114140

141+
public void backoff() throws IOException {
142+
try {
143+
Thread.sleep(getBackoffDelay());
144+
} catch (InterruptedException e) {
145+
if (suppressInterrupts) {
146+
// remember interrupted status to restore the thread's interrupted flag later
147+
interrupted = true;
148+
} else {
149+
Thread.currentThread().interrupt();
150+
throw new InterruptedIOException("thread interrupted");
151+
}
152+
}
153+
}
154+
155+
@Override
156+
public void close() {
157+
if (interrupted) {
158+
Thread.currentThread().interrupt();
159+
}
160+
}
161+
115162
public static class Factory {
116163
private final int maxRetries;
117164
private final long initialDelay;
118165
private final double delayFactor;
166+
private final boolean retryInterrupts;
119167

120168
public Factory(int maxRetries, int initialDelay, double delayFactor) {
169+
this(maxRetries, initialDelay, delayFactor, false);
170+
}
171+
172+
public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
121173
this.maxRetries = maxRetries;
122174
this.initialDelay = initialDelay;
123175
this.delayFactor = delayFactor;
176+
this.retryInterrupts = retryInterrupts;
124177
}
125178

126179
public HttpRetryPolicy create() {
127-
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
180+
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
128181
}
129182
}
130183
}

communication/src/main/java/datadog/communication/http/OkHttpUtils.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import datadog.trace.util.AgentProxySelector;
1212
import java.io.File;
1313
import java.io.IOException;
14-
import java.io.InterruptedIOException;
15-
import java.net.ConnectException;
1614
import java.net.InetSocketAddress;
1715
import java.net.Proxy;
1816
import java.nio.ByteBuffer;
@@ -357,16 +355,10 @@ public void writeTo(BufferedSink sink) throws IOException {
357355
}
358356
}
359357

360-
/**
361-
* Retries a request in accordance with the provided retry policy. <strong>Important:</strong>
362-
* interrupts to a thread executing this method are ignored (the thread's interruption flag is
363-
* restored on exit)
364-
*/
365358
public static Response sendWithRetries(
366-
OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
367-
boolean interrupted = false;
368-
try {
369-
359+
OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
360+
throws IOException {
361+
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
370362
while (true) {
371363
try {
372364
Response response = httpClient.newCall(request).execute();
@@ -378,24 +370,13 @@ public static Response sendWithRetries(
378370
} else {
379371
closeQuietly(response);
380372
}
381-
} catch (ConnectException | InterruptedIOException ex) {
382-
if (!retryPolicy.shouldRetry(null)) {
373+
} catch (Exception ex) {
374+
if (!retryPolicy.shouldRetry(ex)) {
383375
throw ex;
384376
}
385377
}
386378
// If we get here, there has been an error, and we still have retries left
387-
long backoffMs = retryPolicy.backoff();
388-
try {
389-
Thread.sleep(backoffMs);
390-
} catch (InterruptedException e) {
391-
// remember interrupted status to restore the thread's interrupted flag later
392-
interrupted = true;
393-
}
394-
}
395-
396-
} finally {
397-
if (interrupted) {
398-
Thread.currentThread().interrupt();
379+
retryPolicy.backoff();
399380
}
400381
}
401382
}

communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {
1717

1818
when:
1919
while (retry <= maxRetries) {
20-
def shouldRetry = retryPolicy.shouldRetry()
20+
def shouldRetry = retryPolicy.shouldRetry((Response) null)
2121
shouldRetries << shouldRetry
2222
if (shouldRetry) {
23-
backoffs << retryPolicy.backoff()
23+
backoffs << retryPolicy.getBackoffDelay()
2424
}
2525
retry += 1
2626
}
@@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
4444
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()
4545

4646
def responseBuilder = new Response.Builder()
47-
.code(responseCode)
48-
.request(GroovyMock(Request))
49-
.protocol(Protocol.HTTP_1_1)
50-
.message("")
47+
.code(responseCode)
48+
.request(GroovyMock(Request))
49+
.protocol(Protocol.HTTP_1_1)
50+
.message("")
5151
if (rateLimitHeader != null) {
5252
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
5353
}
@@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
7373
500 | null | 5
7474
501 | null | 5
7575
}
76+
77+
def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
78+
setup:
79+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()
80+
81+
expect:
82+
retryPolicy.shouldRetry(exception) == shouldRetry
83+
84+
where:
85+
exception | suppressInterrupts | shouldRetry
86+
new NullPointerException() | false | false
87+
new IllegalArgumentException() | false | false
88+
new ConnectException() | false | true
89+
new InterruptedIOException() | false | false
90+
new InterruptedIOException() | true | true
91+
new InterruptedException() | false | false
92+
new InterruptedException() | true | true
93+
}
94+
95+
def "test interrupt flag is preserved when suppressing interrupts"() {
96+
setup:
97+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()
98+
99+
when:
100+
retryPolicy.shouldRetry(new InterruptedException())
101+
retryPolicy.close()
102+
103+
then:
104+
Thread.interrupted()
105+
}
106+
107+
def "test interrupt flag is preserved if interrupted while backing off"() {
108+
setup:
109+
boolean[] b = new boolean[2]
110+
111+
Runnable r = () -> {
112+
def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
113+
retryPolicy.backoff()
114+
115+
b[0] = Thread.currentThread().isInterrupted()
116+
retryPolicy.close()
117+
b[1] = Thread.interrupted()
118+
}
119+
Thread t = new Thread(r, "test-http-retry-policy-interrupts")
120+
121+
when:
122+
t.start()
123+
t.interrupt()
124+
t.join()
125+
126+
then:
127+
!b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
128+
b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
129+
}
76130
}

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
2323
}
2424

2525
public @Nullable BackendApi createBackendApi() {
26-
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
26+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
2727

2828
if (config.isCiVisibilityAgentlessEnabled()) {
2929
HttpUrl agentlessUrl = getAgentlessUrl();

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ public <T> T post(
8181

8282
final Request request = requestBuilder.post(requestBody).build();
8383

84-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8584
try (okhttp3.Response response =
86-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
85+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
8786
if (response.isSuccessful()) {
8887
log.debug("Request to {} returned successful response: {}", uri, response.code());
8988

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ public <T> T post(
8585
}
8686

8787
Request request = requestBuilder.build();
88-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8988
try (okhttp3.Response response =
90-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
89+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
9190
if (response.isSuccessful()) {
9291
log.debug("Request to {} returned successful response: {}", uri, response.code());
9392
InputStream responseBodyStream = response.body().byteStream();

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public DDEvpProxyApi build() {
9292
? httpClient
9393
: OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis);
9494

95-
final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
95+
final HttpRetryPolicy.Factory retryPolicyFactory =
96+
new HttpRetryPolicy.Factory(5, 100, 2.0, true);
9697

9798
log.debug("proxiedApiUrl: {}", proxiedApiUrl);
9899
return new DDEvpProxyApi(
@@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) {
141142
totalTraces += payload.traceCount();
142143
receivedTraces += payload.traceCount();
143144

144-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
145145
try (okhttp3.Response response =
146-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
146+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
147147
if (response.isSuccessful()) {
148148
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
149149
return Response.success(response.code());

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder {
4343

4444
HttpUrl hostUrl = null;
4545
OkHttpClient httpClient = null;
46-
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
46+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
4747

4848
private String apiKey;
4949

@@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) {
134134
totalTraces += payload.traceCount();
135135
receivedTraces += payload.traceCount();
136136

137-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
138137
try (okhttp3.Response response =
139-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
138+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
140139
if (response.isSuccessful()) {
141140
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
142141
return Response.success(response.code());

0 commit comments

Comments
 (0)