@@ -74,26 +74,16 @@ public TraceProcessingWorker(
7474 spanSamplingWorker .getSpanSamplingQueue (),
7575 droppingPolicy );
7676
77- boolean runAsDaemon = !Config .get ().isCiVisibilityEnabled ();
7877 this .serializingHandler =
79- runAsDaemon
80- ? new DaemonTraceSerializingHandler (
81- primaryQueue ,
82- secondaryQueue ,
83- healthMetrics ,
84- dispatcher ,
85- flushInterval ,
86- timeUnit ,
87- spanPostProcessor )
88- : new NonDaemonTraceSerializingHandler (
89- primaryQueue ,
90- secondaryQueue ,
91- healthMetrics ,
92- dispatcher ,
93- flushInterval ,
94- timeUnit ,
95- spanPostProcessor );
96- this .serializerThread = newAgentThread (TRACE_PROCESSOR , serializingHandler , runAsDaemon );
78+ new TraceSerializingHandler (
79+ primaryQueue ,
80+ secondaryQueue ,
81+ healthMetrics ,
82+ dispatcher ,
83+ flushInterval ,
84+ timeUnit ,
85+ spanPostProcessor );
86+ this .serializerThread = newAgentThread (TRACE_PROCESSOR , serializingHandler );
9787 }
9888
9989 public void start () {
@@ -144,91 +134,7 @@ private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity)
144134 return new MpscBlockingConsumerArrayQueue <>(capacity );
145135 }
146136
147- private static class DaemonTraceSerializingHandler extends TraceSerializingHandler {
148- public DaemonTraceSerializingHandler (
149- MpscBlockingConsumerArrayQueue <Object > primaryQueue ,
150- MpscBlockingConsumerArrayQueue <Object > secondaryQueue ,
151- HealthMetrics healthMetrics ,
152- PayloadDispatcher payloadDispatcher ,
153- long flushInterval ,
154- TimeUnit timeUnit ,
155- SpanPostProcessor spanPostProcessor ) {
156- super (
157- primaryQueue ,
158- secondaryQueue ,
159- healthMetrics ,
160- payloadDispatcher ,
161- flushInterval ,
162- timeUnit ,
163- spanPostProcessor );
164- }
165-
166- @ Override
167- public void run () {
168- try {
169- runDutyCycle ();
170- } catch (InterruptedException e ) {
171- Thread .currentThread ().interrupt ();
172- }
173- log .debug ("Datadog trace processor exited. Publishing traces stopped" );
174- }
175-
176- private void runDutyCycle () throws InterruptedException {
177- Thread thread = Thread .currentThread ();
178- while (!thread .isInterrupted ()) {
179- consumeFromPrimaryQueue ();
180- consumeFromSecondaryQueue ();
181- flushIfNecessary ();
182- }
183- }
184- }
185-
186- private static class NonDaemonTraceSerializingHandler extends TraceSerializingHandler {
187- private static final double SHUTDOWN_TIMEOUT_MILLIS = 5_000 ;
188- private Long shutdownSignalTimestamp ;
189-
190- public NonDaemonTraceSerializingHandler (
191- MpscBlockingConsumerArrayQueue <Object > primaryQueue ,
192- MpscBlockingConsumerArrayQueue <Object > secondaryQueue ,
193- HealthMetrics healthMetrics ,
194- PayloadDispatcher payloadDispatcher ,
195- long flushInterval ,
196- TimeUnit timeUnit ,
197- SpanPostProcessor spanPostProcessor ) {
198- super (
199- primaryQueue ,
200- secondaryQueue ,
201- healthMetrics ,
202- payloadDispatcher ,
203- flushInterval ,
204- timeUnit ,
205- spanPostProcessor );
206- }
207-
208- @ Override
209- public void run () {
210- while (!shouldShutdown ()) {
211- try {
212- consumeFromPrimaryQueue ();
213- consumeFromSecondaryQueue ();
214- flushIfNecessary ();
215- } catch (InterruptedException e ) {
216- if (shutdownSignalTimestamp == null ) {
217- shutdownSignalTimestamp = System .currentTimeMillis ();
218- }
219- }
220- }
221- log .debug ("Datadog trace processor exited. Unpublished traces left: " + !queuesAreEmpty ());
222- }
223-
224- private boolean shouldShutdown () {
225- return shutdownSignalTimestamp != null
226- && (shutdownSignalTimestamp + SHUTDOWN_TIMEOUT_MILLIS <= System .currentTimeMillis ()
227- || queuesAreEmpty ());
228- }
229- }
230-
231- public abstract static class TraceSerializingHandler implements Runnable {
137+ public static class TraceSerializingHandler implements Runnable {
232138
233139 private final MpscBlockingConsumerArrayQueue <Object > primaryQueue ;
234140 private final MpscBlockingConsumerArrayQueue <Object > secondaryQueue ;
@@ -261,6 +167,27 @@ public TraceSerializingHandler(
261167 this .spanPostProcessor = spanPostProcessor ;
262168 }
263169
170+ @ Override
171+ public void run () {
172+ try {
173+ runDutyCycle ();
174+ } catch (InterruptedException e ) {
175+ Thread .currentThread ().interrupt ();
176+ }
177+ log .debug (
178+ "Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: "
179+ + !queuesAreEmpty ());
180+ }
181+
182+ private void runDutyCycle () throws InterruptedException {
183+ Thread thread = Thread .currentThread ();
184+ while (!thread .isInterrupted ()) {
185+ consumeFromPrimaryQueue ();
186+ consumeFromSecondaryQueue ();
187+ flushIfNecessary ();
188+ }
189+ }
190+
264191 @ SuppressWarnings ("unchecked" )
265192 public void onEvent (Object event ) {
266193 // publish an incomplete batch if
0 commit comments