@@ -28,7 +28,6 @@ use manual_future::{ManualFuture, ManualFutureCompleter};
2828use std:: borrow:: Cow ;
2929use std:: collections:: hash_map:: Entry ;
3030use std:: collections:: { HashMap , HashSet } ;
31-
3231use std:: pin:: Pin ;
3332use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
3433use std:: sync:: { Arc , Mutex , MutexGuard } ;
@@ -249,71 +248,6 @@ impl SidecarServer {
249248 . expect ( "Unable to acquire lock on sessions" )
250249 }
251250
252- async fn get_app (
253- & self ,
254- instance_id : & InstanceId ,
255- runtime_meta : & RuntimeMetadata ,
256- service_name : & str ,
257- env_name : & str ,
258- initial_actions : Vec < TelemetryActions > ,
259- ) -> Option < AppInstance > {
260- let rt_info = self . get_runtime ( instance_id) ;
261-
262- let manual_app_future = rt_info. get_app ( service_name, env_name) ;
263-
264- if manual_app_future. completer . is_none ( ) {
265- return manual_app_future. app_future . await ;
266- }
267-
268- let mut builder = TelemetryWorkerBuilder :: new_fetch_host (
269- service_name. to_owned ( ) ,
270- runtime_meta. language_name . to_owned ( ) ,
271- runtime_meta. language_version . to_owned ( ) ,
272- runtime_meta. tracer_version . to_owned ( ) ,
273- ) ;
274- builder. runtime_id = Some ( instance_id. runtime_id . to_owned ( ) ) ;
275- builder. application . env = Some ( env_name. to_owned ( ) ) ;
276- let session_info = self . get_session ( & instance_id. session_id ) ;
277- let mut config = session_info
278- . session_config
279- . lock ( )
280- . expect ( "Unable to acquire lock on session_config" )
281- . clone ( )
282- . unwrap_or_else ( ddtelemetry:: config:: Config :: from_env) ;
283- config. restartable = true ;
284-
285- let instance_option = match builder. spawn_with_config ( config. clone ( ) ) . await {
286- Ok ( ( handle, worker_join) ) => {
287- info ! ( "spawning telemetry worker {config:?}" ) ;
288-
289- let instance = AppInstance {
290- telemetry : handle,
291- telemetry_worker_shutdown : worker_join. map ( Result :: ok) . boxed ( ) . shared ( ) ,
292- telemetry_metrics : Default :: default ( ) ,
293- } ;
294-
295- instance. telemetry . send_msgs ( initial_actions) . await . ok ( ) ;
296-
297- instance
298- . telemetry
299- . send_msg ( TelemetryActions :: Lifecycle ( LifecycleAction :: Start ) )
300- . await
301- . ok ( ) ;
302- Some ( instance)
303- }
304- Err ( e) => {
305- error ! ( "could not spawn telemetry worker {:?}" , e) ;
306- None
307- }
308- } ;
309- manual_app_future
310- . completer
311- . expect ( "Completed expected Some ManualFuture for application instance, but found none" )
312- . complete ( instance_option)
313- . await ;
314- manual_app_future. app_future . await
315- }
316-
317251 fn send_trace_v04 ( & self , headers : & SerializedTracerHeaderTags , data : & [ u8 ] , target : & Endpoint ) {
318252 let headers: TracerHeaderTags = match headers. try_into ( ) {
319253 Ok ( headers) => headers,
@@ -588,20 +522,77 @@ impl SidecarInterface for SidecarServer {
588522 }
589523 } ;
590524 if let Some ( AppOrQueue :: Queue ( mut enqueued_data) ) = app_or_queue {
525+ let rt_info = self . get_runtime ( & instance_id) ;
526+ let manual_app_future = rt_info. get_app ( & service_name, & env_name) ;
527+
528+ let instance_future = if manual_app_future. completer . is_some ( ) {
529+ let mut builder = TelemetryWorkerBuilder :: new_fetch_host (
530+ service_name. to_owned ( ) ,
531+ runtime_meta. language_name . to_owned ( ) ,
532+ runtime_meta. language_version . to_owned ( ) ,
533+ runtime_meta. tracer_version . to_owned ( ) ,
534+ ) ;
535+ builder. runtime_id = Some ( instance_id. runtime_id . to_owned ( ) ) ;
536+ builder. application . env = Some ( env_name. to_owned ( ) ) ;
537+ let session_info = self . get_session ( & instance_id. session_id ) ;
538+ let mut config = session_info
539+ . session_config
540+ . lock ( )
541+ . expect ( "Unable to acquire lock on session_config" )
542+ . clone ( )
543+ . unwrap_or_else ( ddtelemetry:: config:: Config :: from_env) ;
544+ config. restartable = true ;
545+ Some (
546+ builder
547+ . spawn_with_config ( config. clone ( ) )
548+ . map ( move |result| {
549+ if result. is_ok ( ) {
550+ info ! ( "spawning telemetry worker {config:?}" ) ;
551+ }
552+ result
553+ } ) ,
554+ )
555+ } else {
556+ None
557+ } ;
558+
591559 tokio:: spawn ( async move {
592- let mut actions: Vec < TelemetryActions > = vec ! [ ] ;
593- enqueued_data. extract_telemetry_actions ( & mut actions) . await ;
594-
595- if let Some ( mut app) = self
596- . get_app (
597- & instance_id,
598- & runtime_meta,
599- & service_name,
600- & env_name,
601- actions,
602- )
603- . await
604- {
560+ if let Some ( instance_future) = instance_future {
561+ let instance_option = match instance_future. await {
562+ Ok ( ( handle, worker_join) ) => {
563+ let instance = AppInstance {
564+ telemetry : handle,
565+ telemetry_worker_shutdown : worker_join
566+ . map ( Result :: ok)
567+ . boxed ( )
568+ . shared ( ) ,
569+ telemetry_metrics : Default :: default ( ) ,
570+ } ;
571+
572+ let mut actions: Vec < TelemetryActions > = vec ! [ ] ;
573+ enqueued_data. extract_telemetry_actions ( & mut actions) . await ;
574+ instance. telemetry . send_msgs ( actions) . await . ok ( ) ;
575+
576+ instance
577+ . telemetry
578+ . send_msg ( TelemetryActions :: Lifecycle ( LifecycleAction :: Start ) )
579+ . await
580+ . ok ( ) ;
581+ Some ( instance)
582+ }
583+ Err ( e) => {
584+ error ! ( "could not spawn telemetry worker {:?}" , e) ;
585+ None
586+ }
587+ } ;
588+ manual_app_future
589+ . completer
590+ . expect ( "Completed expected Some ManualFuture for application instance, but found none" )
591+ . complete ( instance_option)
592+ . await ;
593+ }
594+
595+ if let Some ( mut app) = manual_app_future. app_future . await {
605596 // Register metrics
606597 for metric in std:: mem:: take ( & mut enqueued_data. metrics ) . into_iter ( ) {
607598 app. register_metric ( metric) ;
@@ -618,9 +609,14 @@ impl SidecarInterface for SidecarServer {
618609 if actions. iter ( ) . any ( |action| {
619610 matches ! ( action, TelemetryActions :: Lifecycle ( LifecycleAction :: Stop ) )
620611 } ) {
621- self . get_runtime ( & instance_id)
622- . lock_applications ( )
623- . remove ( & queue_id) ;
612+ // Avoid self.get_runtime(), it could create a new one.
613+ if let Some ( session) = self . lock_sessions ( ) . get ( & instance_id. session_id ) {
614+ if let Some ( runtime) =
615+ session. lock_runtimes ( ) . get ( & instance_id. runtime_id )
616+ {
617+ runtime. lock_applications ( ) . remove ( & queue_id) ;
618+ }
619+ }
624620 }
625621
626622 app. telemetry . send_msgs ( actions) . await . ok ( ) ;
0 commit comments