Skip to content

Commit 3e86513

Browse files
authored
Merge branch 'master' into alejandro.gonzalez/Environment-variable-to-activate-SCA
2 parents 7f3b65c + 23a819a commit 3e86513

22 files changed

Lines changed: 608 additions & 92 deletions

File tree

communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.communication.http;
22

3+
import java.io.IOException;
4+
import java.io.InterruptedIOException;
5+
import java.net.ConnectException;
36
import java.util.concurrent.ThreadLocalRandom;
47
import java.util.concurrent.TimeUnit;
58
import javax.annotation.Nullable;
@@ -34,7 +37,7 @@
3437
* instance.
3538
*/
3639
@NotThreadSafe
37-
public class HttpRetryPolicy {
40+
public class HttpRetryPolicy implements AutoCloseable {
3841

3942
private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);
4043

@@ -47,12 +50,35 @@ public class HttpRetryPolicy {
4750

4851
private int retriesLeft;
4952
private long delay;
53+
private boolean interrupted;
5054
private final double delayFactor;
55+
private final boolean suppressInterrupts;
5156

52-
private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
57+
private HttpRetryPolicy(
58+
int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
5359
this.retriesLeft = retriesLeft;
5460
this.delay = delay;
5561
this.delayFactor = delayFactor;
62+
this.suppressInterrupts = suppressInterrupts;
63+
}
64+
65+
public boolean shouldRetry(Exception e) {
66+
if (e instanceof ConnectException) {
67+
return shouldRetry((okhttp3.Response) null);
68+
}
69+
if (e instanceof InterruptedIOException) {
70+
if (suppressInterrupts) {
71+
return shouldRetry((okhttp3.Response) null);
72+
}
73+
}
74+
if (e instanceof InterruptedException) {
75+
if (suppressInterrupts) {
76+
// remember interrupted status to restore the thread's interrupted flag later
77+
interrupted = true;
78+
return shouldRetry((okhttp3.Response) null);
79+
}
80+
}
81+
return false;
5682
}
5783

5884
public boolean shouldRetry(@Nullable okhttp3.Response response) {
@@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
106132
}
107133
}
108134

109-
public long backoff() {
135+
long getBackoffDelay() {
110136
long currentDelay = delay;
111137
delay = (long) (delay * delayFactor);
112138
return currentDelay;
113139
}
114140

141+
public void backoff() throws IOException {
142+
try {
143+
Thread.sleep(getBackoffDelay());
144+
} catch (InterruptedException e) {
145+
if (suppressInterrupts) {
146+
// remember interrupted status to restore the thread's interrupted flag later
147+
interrupted = true;
148+
} else {
149+
Thread.currentThread().interrupt();
150+
throw new InterruptedIOException("thread interrupted");
151+
}
152+
}
153+
}
154+
155+
@Override
156+
public void close() {
157+
if (interrupted) {
158+
Thread.currentThread().interrupt();
159+
}
160+
}
161+
115162
public static class Factory {
116163
private final int maxRetries;
117164
private final long initialDelay;
118165
private final double delayFactor;
166+
private final boolean retryInterrupts;
119167

120168
public Factory(int maxRetries, int initialDelay, double delayFactor) {
169+
this(maxRetries, initialDelay, delayFactor, false);
170+
}
171+
172+
public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
121173
this.maxRetries = maxRetries;
122174
this.initialDelay = initialDelay;
123175
this.delayFactor = delayFactor;
176+
this.retryInterrupts = retryInterrupts;
124177
}
125178

126179
public HttpRetryPolicy create() {
127-
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
180+
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
128181
}
129182
}
130183
}

communication/src/main/java/datadog/communication/http/OkHttpUtils.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import datadog.trace.util.AgentProxySelector;
1212
import java.io.File;
1313
import java.io.IOException;
14-
import java.net.ConnectException;
1514
import java.net.InetSocketAddress;
1615
import java.net.Proxy;
1716
import java.nio.ByteBuffer;
@@ -357,30 +356,27 @@ public void writeTo(BufferedSink sink) throws IOException {
357356
}
358357

359358
public static Response sendWithRetries(
360-
OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
361-
while (true) {
362-
try {
363-
okhttp3.Response response = httpClient.newCall(request).execute();
364-
if (response.isSuccessful()) {
365-
return response;
359+
OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
360+
throws IOException {
361+
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
362+
while (true) {
363+
try {
364+
Response response = httpClient.newCall(request).execute();
365+
if (response.isSuccessful()) {
366+
return response;
367+
}
368+
if (!retryPolicy.shouldRetry(response)) {
369+
return response;
370+
} else {
371+
closeQuietly(response);
372+
}
373+
} catch (Exception ex) {
374+
if (!retryPolicy.shouldRetry(ex)) {
375+
throw ex;
376+
}
366377
}
367-
if (!retryPolicy.shouldRetry(response)) {
368-
return response;
369-
} else {
370-
closeQuietly(response);
371-
}
372-
} catch (ConnectException ex) {
373-
if (!retryPolicy.shouldRetry(null)) {
374-
throw ex;
375-
}
376-
}
377-
// If we get here, there has been an error, and we still have retries left
378-
long backoffMs = retryPolicy.backoff();
379-
try {
380-
Thread.sleep(backoffMs);
381-
} catch (InterruptedException e) {
382-
Thread.currentThread().interrupt();
383-
throw new IOException(e);
378+
// If we get here, there has been an error, and we still have retries left
379+
retryPolicy.backoff();
384380
}
385381
}
386382
}

communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {
1717

1818
when:
1919
while (retry <= maxRetries) {
20-
def shouldRetry = retryPolicy.shouldRetry()
20+
def shouldRetry = retryPolicy.shouldRetry((Response) null)
2121
shouldRetries << shouldRetry
2222
if (shouldRetry) {
23-
backoffs << retryPolicy.backoff()
23+
backoffs << retryPolicy.getBackoffDelay()
2424
}
2525
retry += 1
2626
}
@@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
4444
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()
4545

4646
def responseBuilder = new Response.Builder()
47-
.code(responseCode)
48-
.request(GroovyMock(Request))
49-
.protocol(Protocol.HTTP_1_1)
50-
.message("")
47+
.code(responseCode)
48+
.request(GroovyMock(Request))
49+
.protocol(Protocol.HTTP_1_1)
50+
.message("")
5151
if (rateLimitHeader != null) {
5252
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
5353
}
@@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
7373
500 | null | 5
7474
501 | null | 5
7575
}
76+
77+
def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
78+
setup:
79+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()
80+
81+
expect:
82+
retryPolicy.shouldRetry(exception) == shouldRetry
83+
84+
where:
85+
exception | suppressInterrupts | shouldRetry
86+
new NullPointerException() | false | false
87+
new IllegalArgumentException() | false | false
88+
new ConnectException() | false | true
89+
new InterruptedIOException() | false | false
90+
new InterruptedIOException() | true | true
91+
new InterruptedException() | false | false
92+
new InterruptedException() | true | true
93+
}
94+
95+
def "test interrupt flag is preserved when suppressing interrupts"() {
96+
setup:
97+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()
98+
99+
when:
100+
retryPolicy.shouldRetry(new InterruptedException())
101+
retryPolicy.close()
102+
103+
then:
104+
Thread.interrupted()
105+
}
106+
107+
def "test interrupt flag is preserved if interrupted while backing off"() {
108+
setup:
109+
boolean[] b = new boolean[2]
110+
111+
Runnable r = () -> {
112+
def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
113+
retryPolicy.backoff()
114+
115+
b[0] = Thread.currentThread().isInterrupted()
116+
retryPolicy.close()
117+
b[1] = Thread.interrupted()
118+
}
119+
Thread t = new Thread(r, "test-http-retry-policy-interrupts")
120+
121+
when:
122+
t.start()
123+
t.interrupt()
124+
t.join()
125+
126+
then:
127+
!b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
128+
b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
129+
}
76130
}

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
2323
}
2424

2525
public @Nullable BackendApi createBackendApi() {
26-
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
26+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
2727

2828
if (config.isCiVisibilityAgentlessEnabled()) {
2929
HttpUrl agentlessUrl = getAgentlessUrl();

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ public <T> T post(
8181

8282
final Request request = requestBuilder.post(requestBody).build();
8383

84-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8584
try (okhttp3.Response response =
86-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
85+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
8786
if (response.isSuccessful()) {
8887
log.debug("Request to {} returned successful response: {}", uri, response.code());
8988

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ public <T> T post(
8585
}
8686

8787
Request request = requestBuilder.build();
88-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8988
try (okhttp3.Response response =
90-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
89+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
9190
if (response.isSuccessful()) {
9291
log.debug("Request to {} returned successful response: {}", uri, response.code());
9392
InputStream responseBodyStream = response.body().byteStream();

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/config/ModuleExecutionSettingsFactoryImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public ModuleExecutionSettings create(JvmInfo jvmInfo, @Nullable String moduleNa
6666
boolean earlyFlakeDetectionEnabled = isEarlyFlakeDetectionEnabled(ciVisibilitySettings);
6767
Map<String, String> systemProperties =
6868
getPropertiesPropagatedToChildProcess(
69-
codeCoverageEnabled, itrEnabled, flakyTestRetriesEnabled);
69+
codeCoverageEnabled, itrEnabled, flakyTestRetriesEnabled, earlyFlakeDetectionEnabled);
7070

7171
LOGGER.info(
7272
"CI Visibility settings ({}, {}):\n"
@@ -196,7 +196,10 @@ private boolean isEarlyFlakeDetectionEnabled(CiVisibilitySettings ciVisibilitySe
196196
}
197197

198198
private Map<String, String> getPropertiesPropagatedToChildProcess(
199-
boolean codeCoverageEnabled, boolean itrEnabled, boolean flakyTestRetriesEnabled) {
199+
boolean codeCoverageEnabled,
200+
boolean itrEnabled,
201+
boolean flakyTestRetriesEnabled,
202+
boolean earlyFlakeDetectionEnabled) {
200203
Map<String, String> propagatedSystemProperties = new HashMap<>();
201204
Properties systemProperties = System.getProperties();
202205
for (Map.Entry<Object, Object> e : systemProperties.entrySet()) {
@@ -223,6 +226,11 @@ private Map<String, String> getPropertiesPropagatedToChildProcess(
223226
CiVisibilityConfig.CIVISIBILITY_FLAKY_RETRY_ENABLED),
224227
Boolean.toString(flakyTestRetriesEnabled));
225228

229+
propagatedSystemProperties.put(
230+
Strings.propertyNameToSystemPropertyName(
231+
CiVisibilityConfig.CIVISIBILITY_EARLY_FLAKE_DETECTION_ENABLED),
232+
Boolean.toString(earlyFlakeDetectionEnabled));
233+
226234
// explicitly disable build instrumentation in child processes,
227235
// because some projects run "embedded" Maven/Gradle builds as part of their integration tests,
228236
// and we don't want to show those as if they were regular build executions

0 commit comments

Comments
 (0)