99import datadog .trace .util .TagsHelper ;
1010import java .nio .charset .StandardCharsets ;
1111import java .util .ArrayList ;
12- import java .util .Collections ;
1312import java .util .List ;
1413import java .util .concurrent .ArrayBlockingQueue ;
1514import java .util .concurrent .BlockingQueue ;
@@ -35,7 +34,7 @@ public class SymbolSink {
3534 private final String version ;
3635 private final BatchUploader symbolUploader ;
3736 private final BatchUploader .MultiPartContent event ;
38- private final BlockingQueue <ServiceVersion > scopes = new ArrayBlockingQueue <>(CAPACITY );
37+ private final BlockingQueue <Scope > scopes = new ArrayBlockingQueue <>(CAPACITY );
3938 private final Stats stats = new Stats ();
4039
4140 public SymbolSink (Config config ) {
@@ -59,14 +58,12 @@ public void stop() {
5958 }
6059
6160 public void addScope (Scope jarScope ) {
62- ServiceVersion serviceVersion =
63- new ServiceVersion (serviceName , env , version , "JAVA" , Collections .singletonList (jarScope ));
64- boolean added = scopes .offer (serviceVersion );
61+ boolean added = scopes .offer (jarScope );
6562 int retries = 10 ;
6663 while (!added ) {
6764 // Q is full, flushing synchronously
6865 flush ();
69- added = scopes .offer (serviceVersion );
66+ added = scopes .offer (jarScope );
7067 retries --;
7168 if (retries < 0 ) {
7269 throw new IllegalStateException ("Scope cannot be enqueued after 10 retries" + jarScope );
@@ -78,18 +75,18 @@ public void flush() {
7875 if (scopes .isEmpty ()) {
7976 return ;
8077 }
81- List <ServiceVersion > scopesToSerialize = new ArrayList <>();
78+ List <Scope > scopesToSerialize = new ArrayList <>();
8279 // ArrayBlockingQueue makes drainTo atomic, so it is safe to call flush from different and
8380 // concurrent threads
8481 scopes .drainTo (scopesToSerialize );
85- List < Scope > allScopes = new ArrayList <>();
86- for ( ServiceVersion serviceVersion : scopesToSerialize ) {
87- allScopes . addAll ( serviceVersion . getScopes ()) ;
82+ // concurrent calls to flush can result in empty scope to send, we don't want to send empty
83+ if ( scopesToSerialize . isEmpty () ) {
84+ return ;
8885 }
8986 String json =
9087 SERVICE_VERSION_ADAPTER .toJson (
91- new ServiceVersion (serviceName , env , version , "JAVA" , allScopes ));
92- LOGGER .debug ("Sending {} jar scopes size={}" , allScopes .size (), json .length ());
88+ new ServiceVersion (serviceName , env , version , "JAVA" , scopesToSerialize ));
89+ LOGGER .debug ("Sending {} jar scopes size={}" , scopesToSerialize .size (), json .length ());
9390 updateStats (scopesToSerialize , json );
9491 symbolUploader .uploadAsMultipart (
9592 "" ,
@@ -98,11 +95,9 @@ public void flush() {
9895 json .getBytes (StandardCharsets .UTF_8 ), "file" , "file.json" ));
9996 }
10097
101- private void updateStats (List <ServiceVersion > scopesToSerialize , String json ) {
102- for (ServiceVersion serviceVersion : scopesToSerialize ) {
103- List <Scope > classScopes = serviceVersion .getScopes ().get (0 ).getScopes ();
104- int classScopeCount = classScopes != null ? classScopes .size () : 0 ;
105- stats .updateStats (classScopeCount , json .length ());
98+ private void updateStats (List <Scope > scopesToSerialize , String json ) {
99+ for (Scope scope : scopesToSerialize ) {
100+ stats .updateStats (scope .getScopes () != null ? scope .getScopes ().size () : 0 , json .length ());
106101 }
107102 }
108103
0 commit comments