Skip to content

Commit 0ab8bc2

Browse files
committed
Change queue type, use directly Scope
1 parent 879253c commit 0ab8bc2

1 file changed

Lines changed: 12 additions & 17 deletions

File tree

  • dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import datadog.trace.util.TagsHelper;
1010
import java.nio.charset.StandardCharsets;
1111
import java.util.ArrayList;
12-
import java.util.Collections;
1312
import java.util.List;
1413
import java.util.concurrent.ArrayBlockingQueue;
1514
import java.util.concurrent.BlockingQueue;
@@ -35,7 +34,7 @@ public class SymbolSink {
3534
private final String version;
3635
private final BatchUploader symbolUploader;
3736
private final BatchUploader.MultiPartContent event;
38-
private final BlockingQueue<ServiceVersion> scopes = new ArrayBlockingQueue<>(CAPACITY);
37+
private final BlockingQueue<Scope> scopes = new ArrayBlockingQueue<>(CAPACITY);
3938
private final Stats stats = new Stats();
4039

4140
public SymbolSink(Config config) {
@@ -59,14 +58,12 @@ public void stop() {
5958
}
6059

6160
public void addScope(Scope jarScope) {
62-
ServiceVersion serviceVersion =
63-
new ServiceVersion(serviceName, env, version, "JAVA", Collections.singletonList(jarScope));
64-
boolean added = scopes.offer(serviceVersion);
61+
boolean added = scopes.offer(jarScope);
6562
int retries = 10;
6663
while (!added) {
6764
// Q is full, flushing synchronously
6865
flush();
69-
added = scopes.offer(serviceVersion);
66+
added = scopes.offer(jarScope);
7067
retries--;
7168
if (retries < 0) {
7269
throw new IllegalStateException("Scope cannot be enqueued after 10 retries" + jarScope);
@@ -78,18 +75,18 @@ public void flush() {
7875
if (scopes.isEmpty()) {
7976
return;
8077
}
81-
List<ServiceVersion> scopesToSerialize = new ArrayList<>();
78+
List<Scope> scopesToSerialize = new ArrayList<>();
8279
// ArrayBlockingQueue makes drainTo atomic, so it is safe to call flush from different and
8380
// concurrent threads
8481
scopes.drainTo(scopesToSerialize);
85-
List<Scope> allScopes = new ArrayList<>();
86-
for (ServiceVersion serviceVersion : scopesToSerialize) {
87-
allScopes.addAll(serviceVersion.getScopes());
82+
// concurrent calls to flush can result in empty scope to send, we don't want to send empty
83+
if (scopesToSerialize.isEmpty()) {
84+
return;
8885
}
8986
String json =
9087
SERVICE_VERSION_ADAPTER.toJson(
91-
new ServiceVersion(serviceName, env, version, "JAVA", allScopes));
92-
LOGGER.debug("Sending {} jar scopes size={}", allScopes.size(), json.length());
88+
new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
89+
LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), json.length());
9390
updateStats(scopesToSerialize, json);
9491
symbolUploader.uploadAsMultipart(
9592
"",
@@ -98,11 +95,9 @@ public void flush() {
9895
json.getBytes(StandardCharsets.UTF_8), "file", "file.json"));
9996
}
10097

101-
private void updateStats(List<ServiceVersion> scopesToSerialize, String json) {
102-
for (ServiceVersion serviceVersion : scopesToSerialize) {
103-
List<Scope> classScopes = serviceVersion.getScopes().get(0).getScopes();
104-
int classScopeCount = classScopes != null ? classScopes.size() : 0;
105-
stats.updateStats(classScopeCount, json.length());
98+
private void updateStats(List<Scope> scopesToSerialize, String json) {
99+
for (Scope scope : scopesToSerialize) {
100+
stats.updateStats(scope.getScopes() != null ? scope.getScopes().size() : 0, json.length());
106101
}
107102
}
108103

0 commit comments

Comments
 (0)