Skip to content

Commit a991f56

Browse files
committed
Fix leaking sessions
There was async access happening to sessions, causing sessions being re-created after them being destroyed (and with default config, too). Moving get_app() around, into the flushing logic to cleanly separate session access from async context. Signed-off-by: Bob Weinand <[email protected]>
1 parent 89f48d4 commit a991f56

1 file changed

Lines changed: 78 additions & 82 deletions

File tree

sidecar/src/service/sidecar_server.rs

Lines changed: 78 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use manual_future::{ManualFuture, ManualFutureCompleter};
2828
use std::borrow::Cow;
2929
use std::collections::hash_map::Entry;
3030
use std::collections::{HashMap, HashSet};
31-
3231
use std::pin::Pin;
3332
use std::sync::atomic::{AtomicU64, Ordering};
3433
use std::sync::{Arc, Mutex, MutexGuard};
@@ -249,71 +248,6 @@ impl SidecarServer {
249248
.expect("Unable to acquire lock on sessions")
250249
}
251250

252-
async fn get_app(
253-
&self,
254-
instance_id: &InstanceId,
255-
runtime_meta: &RuntimeMetadata,
256-
service_name: &str,
257-
env_name: &str,
258-
initial_actions: Vec<TelemetryActions>,
259-
) -> Option<AppInstance> {
260-
let rt_info = self.get_runtime(instance_id);
261-
262-
let manual_app_future = rt_info.get_app(service_name, env_name);
263-
264-
if manual_app_future.completer.is_none() {
265-
return manual_app_future.app_future.await;
266-
}
267-
268-
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
269-
service_name.to_owned(),
270-
runtime_meta.language_name.to_owned(),
271-
runtime_meta.language_version.to_owned(),
272-
runtime_meta.tracer_version.to_owned(),
273-
);
274-
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
275-
builder.application.env = Some(env_name.to_owned());
276-
let session_info = self.get_session(&instance_id.session_id);
277-
let mut config = session_info
278-
.session_config
279-
.lock()
280-
.expect("Unable to acquire lock on session_config")
281-
.clone()
282-
.unwrap_or_else(ddtelemetry::config::Config::from_env);
283-
config.restartable = true;
284-
285-
let instance_option = match builder.spawn_with_config(config.clone()).await {
286-
Ok((handle, worker_join)) => {
287-
info!("spawning telemetry worker {config:?}");
288-
289-
let instance = AppInstance {
290-
telemetry: handle,
291-
telemetry_worker_shutdown: worker_join.map(Result::ok).boxed().shared(),
292-
telemetry_metrics: Default::default(),
293-
};
294-
295-
instance.telemetry.send_msgs(initial_actions).await.ok();
296-
297-
instance
298-
.telemetry
299-
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
300-
.await
301-
.ok();
302-
Some(instance)
303-
}
304-
Err(e) => {
305-
error!("could not spawn telemetry worker {:?}", e);
306-
None
307-
}
308-
};
309-
manual_app_future
310-
.completer
311-
.expect("Completed expected Some ManualFuture for application instance, but found none")
312-
.complete(instance_option)
313-
.await;
314-
manual_app_future.app_future.await
315-
}
316-
317251
fn send_trace_v04(&self, headers: &SerializedTracerHeaderTags, data: &[u8], target: &Endpoint) {
318252
let headers: TracerHeaderTags = match headers.try_into() {
319253
Ok(headers) => headers,
@@ -588,20 +522,77 @@ impl SidecarInterface for SidecarServer {
588522
}
589523
};
590524
if let Some(AppOrQueue::Queue(mut enqueued_data)) = app_or_queue {
525+
let rt_info = self.get_runtime(&instance_id);
526+
let manual_app_future = rt_info.get_app(&service_name, &env_name);
527+
528+
let instance_future = if manual_app_future.completer.is_some() {
529+
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
530+
service_name.to_owned(),
531+
runtime_meta.language_name.to_owned(),
532+
runtime_meta.language_version.to_owned(),
533+
runtime_meta.tracer_version.to_owned(),
534+
);
535+
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
536+
builder.application.env = Some(env_name.to_owned());
537+
let session_info = self.get_session(&instance_id.session_id);
538+
let mut config = session_info
539+
.session_config
540+
.lock()
541+
.expect("Unable to acquire lock on session_config")
542+
.clone()
543+
.unwrap_or_else(ddtelemetry::config::Config::from_env);
544+
config.restartable = true;
545+
Some(
546+
builder
547+
.spawn_with_config(config.clone())
548+
.map(move |result| {
549+
if result.is_ok() {
550+
info!("spawning telemetry worker {config:?}");
551+
}
552+
result
553+
}),
554+
)
555+
} else {
556+
None
557+
};
558+
591559
tokio::spawn(async move {
592-
let mut actions: Vec<TelemetryActions> = vec![];
593-
enqueued_data.extract_telemetry_actions(&mut actions).await;
594-
595-
if let Some(mut app) = self
596-
.get_app(
597-
&instance_id,
598-
&runtime_meta,
599-
&service_name,
600-
&env_name,
601-
actions,
602-
)
603-
.await
604-
{
560+
if let Some(instance_future) = instance_future {
561+
let instance_option = match instance_future.await {
562+
Ok((handle, worker_join)) => {
563+
let instance = AppInstance {
564+
telemetry: handle,
565+
telemetry_worker_shutdown: worker_join
566+
.map(Result::ok)
567+
.boxed()
568+
.shared(),
569+
telemetry_metrics: Default::default(),
570+
};
571+
572+
let mut actions: Vec<TelemetryActions> = vec![];
573+
enqueued_data.extract_telemetry_actions(&mut actions).await;
574+
instance.telemetry.send_msgs(actions).await.ok();
575+
576+
instance
577+
.telemetry
578+
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
579+
.await
580+
.ok();
581+
Some(instance)
582+
}
583+
Err(e) => {
584+
error!("could not spawn telemetry worker {:?}", e);
585+
None
586+
}
587+
};
588+
manual_app_future
589+
.completer
590+
.expect("Completed expected Some ManualFuture for application instance, but found none")
591+
.complete(instance_option)
592+
.await;
593+
}
594+
595+
if let Some(mut app) = manual_app_future.app_future.await {
605596
// Register metrics
606597
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
607598
app.register_metric(metric);
@@ -618,9 +609,14 @@ impl SidecarInterface for SidecarServer {
618609
if actions.iter().any(|action| {
619610
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
620611
}) {
621-
self.get_runtime(&instance_id)
622-
.lock_applications()
623-
.remove(&queue_id);
612+
// Avoid self.get_runtime(), it could create a new one.
613+
if let Some(session) = self.lock_sessions().get(&instance_id.session_id) {
614+
if let Some(runtime) =
615+
session.lock_runtimes().get(&instance_id.runtime_id)
616+
{
617+
runtime.lock_applications().remove(&queue_id);
618+
}
619+
}
624620
}
625621

626622
app.telemetry.send_msgs(actions).await.ok();

0 commit comments

Comments
 (0)