Skip to content

Commit ee673c2

Browse files
committed
Use Okio Buffers for AWS SDK response capturing
1 parent a13e2f6 commit ee673c2

4 files changed

Lines changed: 63 additions & 42 deletions

File tree

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AbstractAwsClientInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public String[] helperClassNames() {
1515
packageName + ".AwsSdkClientDecorator",
1616
packageName + ".TracingExecutionInterceptor",
1717
packageName + ".ResponseBodyStreamWrapper",
18+
packageName + ".ConsumableInputStream",
1819
};
1920
}
2021
}

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
2424
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
2525
import datadog.trace.core.datastreams.TagsProcessor;
26-
import java.io.ByteArrayInputStream;
2726
import java.io.InputStream;
2827
import java.net.URI;
2928
import java.time.Instant;
@@ -309,11 +308,9 @@ public AgentSpan onSdkResponse(
309308
if (responseBody.isPresent()) {
310309
InputStream body = responseBody.get();
311310
if (body instanceof ResponseBodyStreamWrapper) {
312-
Optional<ByteArrayInputStream> bodyStream =
313-
((ResponseBodyStreamWrapper) body).toByteArrayInputStream();
314-
// TODO log.debug if bodyStream is empty
315-
bodyStream.ifPresent(
316-
bs -> AgentTracer.get().addTagsFromResponseBody(span, bs, "aws.response.body"));
311+
ResponseBodyStreamWrapper wrapper = (ResponseBodyStreamWrapper) body;
312+
InputStream bodyStream = wrapper.consumeCapturedData();
313+
AgentTracer.get().addTagsFromResponseBody(span, bodyStream, "aws.response.body");
317314
}
318315
}
319316

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package datadog.trace.instrumentation.aws.v2;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import okio.BufferedSource;
6+
7+
public class ConsumableInputStream extends InputStream {
8+
private final BufferedSource source;
9+
private final InputStream is;
10+
11+
public ConsumableInputStream(BufferedSource source) {
12+
this.source = source;
13+
this.is = source.inputStream();
14+
}
15+
16+
@Override
17+
public int read() throws IOException {
18+
return is.read();
19+
}
20+
21+
@Override
22+
public int read(byte[] sink, int offset, int byteCount) throws IOException {
23+
return is.read(sink, offset, byteCount);
24+
}
25+
26+
@Override
27+
public int available() throws IOException {
28+
return is.available();
29+
}
30+
31+
@Override
32+
public void close() throws IOException {
33+
source.close();
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,45 @@
11
package datadog.trace.instrumentation.aws.v2;
22

3-
import java.io.ByteArrayInputStream;
4-
import java.io.ByteArrayOutputStream;
53
import java.io.IOException;
64
import java.io.InputStream;
7-
import java.util.Optional;
5+
import okio.BufferedSource;
6+
import okio.Okio;
87

98
/**
10-
* Buffers stream data that starts with '{' character assuming it is a JSON object. This is used to
11-
* read the response body from AWS SDK after it has been read by the SDK.
9+
* Buffers the response body stream so that it can be read later for tag extraction after it has
10+
* been consumed by the SDK.
1211
*/
1312
public class ResponseBodyStreamWrapper extends InputStream {
13+
private final BufferedSource source;
14+
private final InputStream sdkInputStream;
1415

15-
private final InputStream originalStream;
16-
private ByteArrayOutputStream buffer;
17-
private boolean hasBeenRead;
18-
19-
public ResponseBodyStreamWrapper(InputStream is) {
20-
super();
21-
this.originalStream = is;
16+
public ResponseBodyStreamWrapper(InputStream origin) {
17+
source = Okio.buffer(Okio.source(origin));
18+
// Create a separate stream based on the source without consuming it.
19+
sdkInputStream = source.peek().inputStream();
2220
}
2321

2422
@Override
2523
public int read() throws IOException {
26-
// TODO maybe there should be an upper bound limit to avoid buffering large data
27-
28-
int value = originalStream.read();
29-
30-
if (!hasBeenRead) {
31-
if (value == '{') {
32-
// Start buffering only if it starts with '{' to avoid buffering non-json data
33-
// TODO maybe add a debug statement?
34-
buffer = new ByteArrayOutputStream();
35-
}
36-
hasBeenRead = true;
37-
}
38-
39-
if (buffer != null) {
40-
buffer.write(value);
41-
}
24+
return sdkInputStream.read();
25+
}
4226

43-
return value;
27+
@Override
28+
public int read(byte[] sink, int offset, int byteCount) throws IOException {
29+
return sdkInputStream.read(sink, offset, byteCount);
4430
}
4531

46-
public Optional<ByteArrayInputStream> toByteArrayInputStream() {
47-
if (buffer == null) {
48-
return Optional.empty();
49-
}
50-
return Optional.of(new ByteArrayInputStream(buffer.toByteArray()));
32+
@Override
33+
public int available() throws IOException {
34+
return sdkInputStream.available();
5135
}
5236

5337
@Override
54-
public void close() throws IOException {
55-
originalStream.close();
38+
public void close() {
39+
// doesn't close the source, so it can be read later
40+
}
41+
42+
public InputStream consumeCapturedData() {
43+
return new ConsumableInputStream(source);
5644
}
5745
}

0 commit comments

Comments
 (0)