Skip to content

Commit a1e763c

Browse files
feat(s2n-quic-dc): Track stream write buffer allocations (#2779)
This will help confirm that per-stream we only allocate a fixed number of buffers, and provide insight into what the common buffer sizes are to help guide tuning of allocator pool caches. This is minimally useful for TCP streams, but is more interesting for streams over UDP where the allocations are influenced by retransmit returning buffers to the pool.
1 parent 8d7fd3e commit a1e763c

File tree

9 files changed

+977
-737
lines changed

9 files changed

+977
-737
lines changed

dc/s2n-quic-dc/events/connection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ pub struct StreamWriteKeyUpdated {
9090
key_phase: u8,
9191
}
9292

93+
#[event("stream:write_allocated")]
94+
#[measure_counter("conn")]
95+
pub struct StreamWriteAllocated {
96+
/// The number of bytes that we allocated.
97+
#[measure("allocated_len", Bytes)]
98+
#[measure_counter("allocated_len.conn", Bytes)]
99+
allocated_len: usize,
100+
}
101+
93102
#[event("stream:write_shutdown")]
94103
#[checkpoint("latency")]
95104
pub struct StreamWriteShutdown {

dc/s2n-quic-dc/src/event/generated.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,23 @@ pub mod api {
735735
}
736736
#[derive(Clone, Debug)]
737737
#[non_exhaustive]
738+
pub struct StreamWriteAllocated {
739+
#[doc = " The number of bytes that we allocated."]
740+
pub allocated_len: usize,
741+
}
742+
#[cfg(any(test, feature = "testing"))]
743+
impl crate::event::snapshot::Fmt for StreamWriteAllocated {
744+
fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
745+
let mut fmt = fmt.debug_struct("StreamWriteAllocated");
746+
fmt.field("allocated_len", &self.allocated_len);
747+
fmt.finish()
748+
}
749+
}
750+
impl Event for StreamWriteAllocated {
751+
const NAME: &'static str = "stream:write_allocated";
752+
}
753+
#[derive(Clone, Debug)]
754+
#[non_exhaustive]
738755
pub struct StreamWriteShutdown {
739756
#[doc = " The number of bytes in the send buffer at the time of shutdown"]
740757
pub buffer_len: usize,
@@ -2219,6 +2236,17 @@ pub mod tracing {
22192236
tracing :: event ! (target : "stream_write_key_updated" , parent : id , tracing :: Level :: DEBUG , { key_phase = tracing :: field :: debug (key_phase) });
22202237
}
22212238
#[inline]
2239+
fn on_stream_write_allocated(
2240+
&self,
2241+
context: &Self::ConnectionContext,
2242+
_meta: &api::ConnectionMeta,
2243+
event: &api::StreamWriteAllocated,
2244+
) {
2245+
let id = context.id();
2246+
let api::StreamWriteAllocated { allocated_len } = event;
2247+
tracing :: event ! (target : "stream_write_allocated" , parent : id , tracing :: Level :: DEBUG , { allocated_len = tracing :: field :: debug (allocated_len) });
2248+
}
2249+
#[inline]
22222250
fn on_stream_write_shutdown(
22232251
&self,
22242252
context: &Self::ConnectionContext,
@@ -3530,6 +3558,20 @@ pub mod builder {
35303558
}
35313559
}
35323560
#[derive(Clone, Debug)]
3561+
pub struct StreamWriteAllocated {
3562+
#[doc = " The number of bytes that we allocated."]
3563+
pub allocated_len: usize,
3564+
}
3565+
impl IntoEvent<api::StreamWriteAllocated> for StreamWriteAllocated {
3566+
#[inline]
3567+
fn into_event(self) -> api::StreamWriteAllocated {
3568+
let StreamWriteAllocated { allocated_len } = self;
3569+
api::StreamWriteAllocated {
3570+
allocated_len: allocated_len.into_event(),
3571+
}
3572+
}
3573+
}
3574+
#[derive(Clone, Debug)]
35333575
pub struct StreamWriteShutdown {
35343576
#[doc = " The number of bytes in the send buffer at the time of shutdown"]
35353577
pub buffer_len: usize,
@@ -4892,6 +4934,18 @@ mod traits {
48924934
let _ = meta;
48934935
let _ = event;
48944936
}
4937+
#[doc = "Called when the `StreamWriteAllocated` event is triggered"]
4938+
#[inline]
4939+
fn on_stream_write_allocated(
4940+
&self,
4941+
context: &Self::ConnectionContext,
4942+
meta: &api::ConnectionMeta,
4943+
event: &api::StreamWriteAllocated,
4944+
) {
4945+
let _ = context;
4946+
let _ = meta;
4947+
let _ = event;
4948+
}
48954949
#[doc = "Called when the `StreamWriteShutdown` event is triggered"]
48964950
#[inline]
48974951
fn on_stream_write_shutdown(
@@ -5629,6 +5683,16 @@ mod traits {
56295683
.on_stream_write_key_updated(context, meta, event);
56305684
}
56315685
#[inline]
5686+
fn on_stream_write_allocated(
5687+
&self,
5688+
context: &Self::ConnectionContext,
5689+
meta: &api::ConnectionMeta,
5690+
event: &api::StreamWriteAllocated,
5691+
) {
5692+
self.as_ref()
5693+
.on_stream_write_allocated(context, meta, event);
5694+
}
5695+
#[inline]
56325696
fn on_stream_write_shutdown(
56335697
&self,
56345698
context: &Self::ConnectionContext,
@@ -6291,6 +6355,16 @@ mod traits {
62916355
(self.1).on_stream_write_key_updated(&context.1, meta, event);
62926356
}
62936357
#[inline]
6358+
fn on_stream_write_allocated(
6359+
&self,
6360+
context: &Self::ConnectionContext,
6361+
meta: &api::ConnectionMeta,
6362+
event: &api::StreamWriteAllocated,
6363+
) {
6364+
(self.0).on_stream_write_allocated(&context.0, meta, event);
6365+
(self.1).on_stream_write_allocated(&context.1, meta, event);
6366+
}
6367+
#[inline]
62946368
fn on_stream_write_shutdown(
62956369
&self,
62966370
context: &Self::ConnectionContext,
@@ -7346,6 +7420,8 @@ mod traits {
73467420
fn on_stream_write_errored(&self, event: builder::StreamWriteErrored);
73477421
#[doc = "Publishes a `StreamWriteKeyUpdated` event to the publisher's subscriber"]
73487422
fn on_stream_write_key_updated(&self, event: builder::StreamWriteKeyUpdated);
7423+
#[doc = "Publishes a `StreamWriteAllocated` event to the publisher's subscriber"]
7424+
fn on_stream_write_allocated(&self, event: builder::StreamWriteAllocated);
73497425
#[doc = "Publishes a `StreamWriteShutdown` event to the publisher's subscriber"]
73507426
fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown);
73517427
#[doc = "Publishes a `StreamWriteSocketFlushed` event to the publisher's subscriber"]
@@ -7456,6 +7532,15 @@ mod traits {
74567532
self.subscriber.on_event(&self.meta, &event);
74577533
}
74587534
#[inline]
7535+
fn on_stream_write_allocated(&self, event: builder::StreamWriteAllocated) {
7536+
let event = event.into_event();
7537+
self.subscriber
7538+
.on_stream_write_allocated(self.context, &self.meta, &event);
7539+
self.subscriber
7540+
.on_connection_event(self.context, &self.meta, &event);
7541+
self.subscriber.on_event(&self.meta, &event);
7542+
}
7543+
#[inline]
74597544
fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown) {
74607545
let event = event.into_event();
74617546
self.subscriber
@@ -8396,6 +8481,7 @@ pub mod testing {
83968481
pub stream_write_blocked: AtomicU64,
83978482
pub stream_write_errored: AtomicU64,
83988483
pub stream_write_key_updated: AtomicU64,
8484+
pub stream_write_allocated: AtomicU64,
83998485
pub stream_write_shutdown: AtomicU64,
84008486
pub stream_write_socket_flushed: AtomicU64,
84018487
pub stream_write_socket_blocked: AtomicU64,
@@ -8499,6 +8585,7 @@ pub mod testing {
84998585
stream_write_blocked: AtomicU64::new(0),
85008586
stream_write_errored: AtomicU64::new(0),
85018587
stream_write_key_updated: AtomicU64::new(0),
8588+
stream_write_allocated: AtomicU64::new(0),
85028589
stream_write_shutdown: AtomicU64::new(0),
85038590
stream_write_socket_flushed: AtomicU64::new(0),
85048591
stream_write_socket_blocked: AtomicU64::new(0),
@@ -8842,6 +8929,20 @@ pub mod testing {
88428929
self.output.lock().unwrap().push(out);
88438930
}
88448931
}
8932+
fn on_stream_write_allocated(
8933+
&self,
8934+
_context: &Self::ConnectionContext,
8935+
meta: &api::ConnectionMeta,
8936+
event: &api::StreamWriteAllocated,
8937+
) {
8938+
self.stream_write_allocated.fetch_add(1, Ordering::Relaxed);
8939+
if self.location.is_some() {
8940+
let meta = crate::event::snapshot::Fmt::to_snapshot(meta);
8941+
let event = crate::event::snapshot::Fmt::to_snapshot(event);
8942+
let out = format!("{meta:?} {event:?}");
8943+
self.output.lock().unwrap().push(out);
8944+
}
8945+
}
88458946
fn on_stream_write_shutdown(
88468947
&self,
88478948
_context: &Self::ConnectionContext,
@@ -9474,6 +9575,7 @@ pub mod testing {
94749575
pub stream_write_blocked: AtomicU64,
94759576
pub stream_write_errored: AtomicU64,
94769577
pub stream_write_key_updated: AtomicU64,
9578+
pub stream_write_allocated: AtomicU64,
94779579
pub stream_write_shutdown: AtomicU64,
94789580
pub stream_write_socket_flushed: AtomicU64,
94799581
pub stream_write_socket_blocked: AtomicU64,
@@ -9567,6 +9669,7 @@ pub mod testing {
95679669
stream_write_blocked: AtomicU64::new(0),
95689670
stream_write_errored: AtomicU64::new(0),
95699671
stream_write_key_updated: AtomicU64::new(0),
9672+
stream_write_allocated: AtomicU64::new(0),
95709673
stream_write_shutdown: AtomicU64::new(0),
95719674
stream_write_socket_flushed: AtomicU64::new(0),
95729675
stream_write_socket_blocked: AtomicU64::new(0),
@@ -10121,6 +10224,15 @@ pub mod testing {
1012110224
self.output.lock().unwrap().push(out);
1012210225
}
1012310226
}
10227+
fn on_stream_write_allocated(&self, event: builder::StreamWriteAllocated) {
10228+
self.stream_write_allocated.fetch_add(1, Ordering::Relaxed);
10229+
let event = event.into_event();
10230+
if self.location.is_some() {
10231+
let event = crate::event::snapshot::Fmt::to_snapshot(&event);
10232+
let out = format!("{event:?}");
10233+
self.output.lock().unwrap().push(out);
10234+
}
10235+
}
1012410236
fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown) {
1012510237
self.stream_write_shutdown.fetch_add(1, Ordering::Relaxed);
1012610238
let event = event.into_event();

dc/s2n-quic-dc/src/event/generated/metrics.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub struct Context<R: Recorder> {
3131
stream_write_blocked: AtomicU64,
3232
stream_write_errored: AtomicU64,
3333
stream_write_key_updated: AtomicU64,
34+
stream_write_allocated: AtomicU64,
3435
stream_write_shutdown: AtomicU64,
3536
stream_write_socket_flushed: AtomicU64,
3637
stream_write_socket_blocked: AtomicU64,
@@ -71,6 +72,7 @@ where
7172
stream_write_blocked: AtomicU64::new(0),
7273
stream_write_errored: AtomicU64::new(0),
7374
stream_write_key_updated: AtomicU64::new(0),
75+
stream_write_allocated: AtomicU64::new(0),
7476
stream_write_shutdown: AtomicU64::new(0),
7577
stream_write_socket_flushed: AtomicU64::new(0),
7678
stream_write_socket_blocked: AtomicU64::new(0),
@@ -147,6 +149,19 @@ where
147149
.on_stream_write_key_updated(&context.recorder, meta, event);
148150
}
149151
#[inline]
152+
fn on_stream_write_allocated(
153+
&self,
154+
context: &Self::ConnectionContext,
155+
meta: &api::ConnectionMeta,
156+
event: &api::StreamWriteAllocated,
157+
) {
158+
context
159+
.stream_write_allocated
160+
.fetch_add(1, Ordering::Relaxed);
161+
self.subscriber
162+
.on_stream_write_allocated(&context.recorder, meta, event);
163+
}
164+
#[inline]
150165
fn on_stream_write_shutdown(
151166
&self,
152167
context: &Self::ConnectionContext,
@@ -341,6 +356,10 @@ impl<R: Recorder> Drop for Context<R> {
341356
"stream_write_key_updated",
342357
self.stream_write_key_updated.load(Ordering::Relaxed) as _,
343358
);
359+
self.recorder.increment_counter(
360+
"stream_write_allocated",
361+
self.stream_write_allocated.load(Ordering::Relaxed) as _,
362+
);
344363
self.recorder.increment_counter(
345364
"stream_write_shutdown",
346365
self.stream_write_shutdown.load(Ordering::Relaxed) as _,

0 commit comments

Comments
 (0)