Skip to content

Commit 90b3231

Browse files
committed
Handle backpressure more gracefully
Only drop some operations on the floor, but never those setting the state of the connection. Otherwise, under load the state may end up in unexpected state. Also do not leak file descriptors on partially received messages. Signed-off-by: Bob Weinand <[email protected]>
1 parent 0f3d46b commit 90b3231

File tree

6 files changed

+134
-29
lines changed

6 files changed

+134
-29
lines changed

datadog-ipc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ tarpc = { path = "./tarpc", default-features = false, features = ["serde-transpo
2525

2626
libdd-common = { path = "../libdd-common" }
2727
datadog-ipc-macros = { path = "../datadog-ipc-macros" }
28+
tracing = { version = "0.1", default-features = false }
2829

2930
[dev-dependencies]
3031
criterion = "0.5"

datadog-ipc/src/platform/unix/channel/metadata.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::{
55
collections::VecDeque,
66
io,
7+
os::fd::IntoRawFd,
78
os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
89
};
910

@@ -17,8 +18,8 @@ use crate::{
1718
#[derive(Debug)]
1819
pub struct ChannelMetadata {
1920
fds_to_send: Vec<PlatformHandle<OwnedFd>>,
20-
fds_received: VecDeque<RawFd>,
21-
pid: libc::pid_t, // must always be set to current Process ID
21+
fds_received: VecDeque<OwnedFd>, // Store as OwnedFd to prevent leaking them
22+
pid: libc::pid_t, // must always be set to current Process ID
2223
}
2324

2425
impl Default for ChannelMetadata {
@@ -80,7 +81,13 @@ impl ChannelMetadata {
8081
}
8182

8283
pub(crate) fn receive_fds(&mut self, fds: &[RawFd]) {
83-
self.fds_received.append(&mut fds.to_vec().into());
84+
self.fds_received.append(
85+
&mut fds
86+
.iter()
87+
.map(|fd| unsafe { OwnedFd::from_raw_fd(*fd) })
88+
.collect::<Vec<_>>()
89+
.into(),
90+
);
8491
}
8592

8693
pub(crate) fn find_handle<T>(&mut self, hint: &PlatformHandle<T>) -> Option<PlatformHandle<T>> {
@@ -90,6 +97,6 @@ impl ChannelMetadata {
9097

9198
let fd = self.fds_received.pop_front();
9299

93-
fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd) })
100+
fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd.into_raw_fd()) })
94101
}
95102
}

datadog-ipc/src/sequential.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use futures::{ready, Future, Stream};
4+
use futures::{Future, Stream};
55
use std::fmt::Debug;
66
use std::{
77
pin::Pin,
88
task::{Context, Poll},
99
};
1010
use tarpc::server::{Channel, InFlightRequest, Requests, Serve};
11+
use tokio::sync::mpsc::error::SendError;
12+
use tokio::sync::mpsc::OwnedPermit;
1113

1214
#[allow(type_alias_bounds)]
1315
type Request<S, C: Channel> = (S, InFlightRequest<C::Req, C::Resp>);
1416

17+
type PendingPermit<S, C> = Pin<
18+
Box<dyn Future<Output = Result<OwnedPermit<Request<S, C>>, SendError<()>>> + Send + 'static>,
19+
>;
20+
1521
/// Replaces tarpc::server::Channel::execute which spawns one task per message with an executor
1622
/// that spawns a single worker and queues requests for this task.
1723
///
18-
/// If the queue is full, the requests is dropped and will be cancelled by tarpc.
24+
/// If the queue is full, the request is dropped and will be cancelled by tarpc unless
25+
/// `with_backpressure` is configured for that request type.
1926
pub fn execute_sequential<C, S>(
2027
reqs: Requests<C>,
2128
serve: S,
@@ -43,6 +50,8 @@ where
4350
inner: reqs,
4451
serve,
4552
tx,
53+
backpressure: |_| false,
54+
pending: None,
4655
}
4756
}
4857

@@ -55,6 +64,11 @@ where
5564
inner: Requests<C>,
5665
serve: S,
5766
tx: tokio::sync::mpsc::Sender<Request<S, C>>,
67+
/// Returns true for requests that must not be dropped when the queue is full.
68+
/// The executor will pause reading new requests and wait for channel space instead.
69+
backpressure: fn(&C::Req) -> bool,
70+
/// Pending channel-space reservation for a backpressure request.
71+
pending: Option<(PendingPermit<S, C>, Request<S, C>)>,
5872
}
5973

6074
impl<C, S> Future for SequentialExecutor<C, S>
@@ -68,21 +82,50 @@ where
6882
type Output = anyhow::Result<()>;
6983

7084
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71-
while let Some(response_handler) = ready!(self.as_mut().project().inner.poll_next(cx)) {
72-
match response_handler {
73-
Ok(resp) => {
74-
let server = self.serve.clone();
75-
if let Err(_err) = self.as_ref().tx.try_send((server, resp)) {
76-
// TODO: should we log something in case we drop the request on the floor?
85+
loop {
86+
// First flush any pending backpressure send before reading new requests.
87+
{
88+
let this = self.as_mut().project();
89+
if let Some((fut, _)) = this.pending.as_mut() {
90+
match fut.as_mut().poll(cx) {
91+
Poll::Pending => return Poll::Pending,
92+
Poll::Ready(Err(_)) => return Poll::Ready(Ok(())), // worker dropped
93+
Poll::Ready(Ok(permit)) => {
94+
#[allow(clippy::unwrap_used)] // we've just checked this
95+
let (_, item) = this.pending.take().unwrap();
96+
permit.send(item);
97+
// fall through to poll next request
98+
}
7799
}
78100
}
79-
Err(e) => {
80-
// TODO: should we log something in case we drop the request on the floor?
81-
return Poll::Ready(Err(e.into()));
101+
}
102+
103+
// Read the next request off the transport.
104+
match self.as_mut().project().inner.poll_next(cx) {
105+
Poll::Ready(Some(Ok(resp))) => {
106+
let backpressured = (self.backpressure)(&resp.get().message);
107+
match self.as_ref().tx.try_send((self.serve.clone(), resp)) {
108+
Ok(()) => {} // loop to pick up the next request
109+
Err(err) => {
110+
let (_, resp) = err.into_inner();
111+
if backpressured {
112+
let fut = Box::pin(self.as_ref().tx.clone().reserve_owned());
113+
*self.as_mut().project().pending =
114+
Some((fut, (self.serve.clone(), resp)));
115+
} else {
116+
tracing::warn!(
117+
"Dropping {:?}: sequential executor queue is full",
118+
resp.get().message
119+
);
120+
}
121+
}
122+
}
82123
}
124+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e.into())),
125+
Poll::Ready(None) => return Poll::Ready(Ok(())),
126+
Poll::Pending => return Poll::Pending,
83127
}
84128
}
85-
Poll::Ready(Ok(()))
86129
}
87130
}
88131

@@ -97,4 +140,11 @@ where
97140
std::mem::swap(&mut self.tx, &mut sender);
98141
sender
99142
}
143+
144+
/// Configures a predicate that identifies requests which must not be dropped when the queue
145+
/// is full. For those requests the executor will pause reading and wait for channel space.
146+
pub fn with_backpressure(mut self, backpressure: fn(&C::Req) -> bool) -> Self {
147+
self.backpressure = backpressure;
148+
self
149+
}
100150
}

datadog-sidecar-macros/src/lib.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,46 @@ fn snake_to_camel(ident_str: &str) -> String {
2828
}
2929

3030
#[proc_macro_attribute]
31-
pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
32-
let item: ItemTrait = syn::parse(input.clone()).unwrap();
31+
pub fn extract_request_id(_attr: TokenStream, input: TokenStream) -> TokenStream {
32+
let mut item: ItemTrait = syn::parse(input).unwrap();
3333
let name = &format_ident!("{}Request", item.ident);
3434
let mut arms: Vec<Arm> = vec![];
35-
for inner in item.items {
35+
let mut backpressure_variants: Vec<Ident> = vec![];
36+
37+
for inner in item.items.iter_mut() {
3638
if let TraitItem::Fn(func) = inner {
37-
for any_arg in func.sig.inputs {
39+
// Strip #[force_backpressure] and record which methods carry it.
40+
let had_force_backpressure = func.attrs.iter().any(|attr| {
41+
attr.meta
42+
.path()
43+
.get_ident()
44+
.is_some_and(|i| i == "force_backpressure")
45+
});
46+
func.attrs.retain(|attr| {
47+
attr.meta
48+
.path()
49+
.get_ident()
50+
.is_none_or(|i| i != "force_backpressure")
51+
});
52+
53+
let method = Ident::new(
54+
&snake_to_camel(&func.sig.ident.to_string()),
55+
Span::mixed_site(),
56+
);
57+
58+
if had_force_backpressure {
59+
backpressure_variants.push(method.clone());
60+
}
61+
62+
for any_arg in &func.sig.inputs {
3863
if let Typed(arg) = any_arg {
39-
if let Pat::Ident(ident) = *arg.pat {
64+
if let Pat::Ident(ident) = &*arg.pat {
4065
let matched_enum_type = match ident.ident.to_string().as_str() {
4166
"session_id" => Some(format_ident!("SessionId")),
4267
"instance_id" => Some(format_ident!("InstanceId")),
4368
_ => None,
4469
};
4570
if let Some(enum_type) = matched_enum_type {
46-
let method = Ident::new(
47-
&snake_to_camel(&func.sig.ident.to_string()),
48-
Span::mixed_site(),
49-
);
5071
arms.push(parse_quote! {
5172
#name::#method { #ident, .. } => RequestIdentifier::#enum_type(#ident.clone())
5273
});
@@ -56,7 +77,16 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt
5677
}
5778
}
5879
}
59-
input.extend(TokenStream::from(quote! {
80+
81+
let backpressure_body = if backpressure_variants.is_empty() {
82+
quote! { false }
83+
} else {
84+
quote! { matches!(self, #(#name::#backpressure_variants { .. })|*) }
85+
};
86+
87+
TokenStream::from(quote! {
88+
#item
89+
6090
impl RequestIdentification for tarpc::Request<#name> {
6191
fn extract_identifier(&self) -> RequestIdentifier {
6292
match &self.message {
@@ -67,8 +97,14 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt
6797
}
6898
}
6999
}
70-
}));
71-
input
100+
101+
impl #name {
102+
/// Returns true if this request variant was annotated with `#[force_backpressure]`.
103+
pub fn requires_backpressure(&self) -> bool {
104+
#backpressure_body
105+
}
106+
}
107+
})
72108
}
73109

74110
struct EnvOrDefault {

datadog-sidecar/src/service/sidecar_interface.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub trait SidecarInterface {
6060
/// * `session_id` - The ID of the session.
6161
/// * `pid` - The pid of the sidecar client.
6262
/// * `config` - The configuration to be set.
63+
#[force_backpressure]
6364
async fn set_session_config(
6465
session_id: String,
6566
remote_config_notify_target: RemoteConfigNotifyTarget,
@@ -73,20 +74,23 @@ pub trait SidecarInterface {
7374
///
7475
/// * `session_id` - The ID of the session.
7576
/// * `process_tags` - The process tags string.
77+
#[force_backpressure]
7678
async fn set_session_process_tags(session_id: String, process_tags: String);
7779

7880
/// Shuts down a runtime.
7981
///
8082
/// # Arguments
8183
///
8284
/// * `instance_id` - The ID of the instance.
85+
#[force_backpressure]
8386
async fn shutdown_runtime(instance_id: InstanceId);
8487

8588
/// Shuts down a session.
8689
///
8790
/// # Arguments
8891
///
8992
/// * `session_id` - The ID of the session.
93+
#[force_backpressure]
9094
async fn shutdown_session(session_id: String);
9195

9296
/// Sends a trace via shared memory.
@@ -165,6 +169,7 @@ pub trait SidecarInterface {
165169
/// * `global_tags` - Global tags which need to be propagated.
166170
/// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or
167171
/// not set.
172+
#[force_backpressure]
168173
async fn set_universal_service_tags(
169174
instance_id: InstanceId,
170175
queue_id: QueueId,
@@ -182,6 +187,7 @@ pub trait SidecarInterface {
182187
/// * `queue_id` - The unique identifier for the trace context.
183188
/// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or
184189
/// not set.
190+
#[force_backpressure]
185191
async fn set_request_config(
186192
instance_id: InstanceId,
187193
queue_id: QueueId,
@@ -197,6 +203,7 @@ pub trait SidecarInterface {
197203
async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec<DogStatsDActionOwned>);
198204

199205
/// Flushes any outstanding traces queued for sending.
206+
#[force_backpressure]
200207
async fn flush_traces();
201208

202209
/// Sets x-datadog-test-session-token on all requests for the given session.
@@ -208,19 +215,22 @@ pub trait SidecarInterface {
208215
async fn set_test_session_token(session_id: String, token: String);
209216

210217
/// Sends a ping to the service.
218+
#[force_backpressure]
211219
async fn ping();
212220

213221
/// Dumps the current state of the service.
214222
///
215223
/// # Returns
216224
///
217225
/// A string representation of the current state of the service.
226+
#[force_backpressure]
218227
async fn dump() -> String;
219228

220229
/// Retrieves the current statistics of the service.
221230
///
222231
/// # Returns
223232
///
224233
/// A string representation of the current statistics of the service.
234+
#[force_backpressure]
225235
async fn stats() -> String;
226236
}

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ impl SidecarServer {
152152
server.requests(),
153153
self.clone().serve(),
154154
500,
155-
);
155+
)
156+
.with_backpressure(SidecarInterfaceRequest::requires_backpressure);
156157
let (tx, rx) = tokio::sync::mpsc::channel::<_>(100);
157158
let tx = executor.swap_sender(tx);
158159

0 commit comments

Comments
 (0)