Skip to content

Commit d2c0794

Browse files
feat(s2n-quic-dc): Add metric for improving decrypt performance (#2785)
This will help optimize the buffers workloads provide to s2n-quic-dc. Also add more clarity to a comment on buffer management in interposer.
1 parent 110a491 commit d2c0794

File tree

8 files changed

+751
-457
lines changed

8 files changed

+751
-457
lines changed

dc/s2n-quic-dc/events/connection.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,26 @@ pub struct StreamReadSocketErrored {
259259
errno: Option<i32>,
260260
}
261261

262+
#[event("stream:decrypt_packet")]
263+
pub struct StreamDecryptPacket {
264+
/// Did we decrypt the packet in place, or were we able to merge the copy and decrypt?
265+
#[bool_counter("decrypted_in_place")]
266+
decrypted_in_place: bool,
267+
268+
/// The number of bytes we were forced to copy after decrypting in the packet buffer.
269+
///
270+
/// This means that the application buffer was insufficiently large to allow us to directly
271+
/// copy as part of the decrypt. This can be non-zero even with decrypted_in_place=false, if we
272+
/// decrypted into the reassembly buffer. Right now it doesn't take into account zero-copy
273+
/// reads from the reassembly buffer (e.g., with specialized Bytes).
274+
#[measure("forced_copy", Bytes)]
275+
forced_copy: usize,
276+
277+
/// The application buffer size that would avoid copies.
278+
#[measure("required_application_buffer", Bytes)]
279+
required_application_buffer: usize,
280+
}
281+
262282
/// Tracks stream connect where dcQUIC owns the TCP connect().
263283
#[event("stream:tcp_connect")]
264284
#[subject(endpoint)]

dc/s2n-quic-dc/src/event/generated.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,37 @@ pub mod api {
10131013
}
10141014
#[derive(Clone, Debug)]
10151015
#[non_exhaustive]
1016+
pub struct StreamDecryptPacket {
1017+
#[doc = " Did we decrypt the packet in place, or were we able to merge the copy and decrypt?"]
1018+
pub decrypted_in_place: bool,
1019+
#[doc = " The number of bytes we were forced to copy after decrypting in the packet buffer."]
1020+
#[doc = ""]
1021+
#[doc = " This means that the application buffer was insufficiently large to allow us to directly"]
1022+
#[doc = " copy as part of the decrypt. This can be non-zero even with decrypted_in_place=false, if we"]
1023+
#[doc = " decrypted into the reassembly buffer. Right now it doesn't take into account zero-copy"]
1024+
#[doc = " reads from the reassembly buffer (e.g., with specialized Bytes)."]
1025+
pub forced_copy: usize,
1026+
#[doc = " The application buffer size that would avoid copies."]
1027+
pub required_application_buffer: usize,
1028+
}
1029+
#[cfg(any(test, feature = "testing"))]
1030+
impl crate::event::snapshot::Fmt for StreamDecryptPacket {
1031+
fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
1032+
let mut fmt = fmt.debug_struct("StreamDecryptPacket");
1033+
fmt.field("decrypted_in_place", &self.decrypted_in_place);
1034+
fmt.field("forced_copy", &self.forced_copy);
1035+
fmt.field(
1036+
"required_application_buffer",
1037+
&self.required_application_buffer,
1038+
);
1039+
fmt.finish()
1040+
}
1041+
}
1042+
impl Event for StreamDecryptPacket {
1043+
const NAME: &'static str = "stream:decrypt_packet";
1044+
}
1045+
#[derive(Clone, Debug)]
1046+
#[non_exhaustive]
10161047
#[doc = " Tracks stream connect where dcQUIC owns the TCP connect()."]
10171048
pub struct StreamTcpConnect {
10181049
pub error: bool,
@@ -2416,6 +2447,21 @@ pub mod tracing {
24162447
tracing :: event ! (target : "stream_read_socket_errored" , parent : id , tracing :: Level :: DEBUG , { capacity = tracing :: field :: debug (capacity) , errno = tracing :: field :: debug (errno) });
24172448
}
24182449
#[inline]
2450+
fn on_stream_decrypt_packet(
2451+
&self,
2452+
context: &Self::ConnectionContext,
2453+
_meta: &api::ConnectionMeta,
2454+
event: &api::StreamDecryptPacket,
2455+
) {
2456+
let id = context.id();
2457+
let api::StreamDecryptPacket {
2458+
decrypted_in_place,
2459+
forced_copy,
2460+
required_application_buffer,
2461+
} = event;
2462+
tracing :: event ! (target : "stream_decrypt_packet" , parent : id , tracing :: Level :: DEBUG , { decrypted_in_place = tracing :: field :: debug (decrypted_in_place) , forced_copy = tracing :: field :: debug (forced_copy) , required_application_buffer = tracing :: field :: debug (required_application_buffer) });
2463+
}
2464+
#[inline]
24192465
fn on_stream_tcp_connect(&self, meta: &api::EndpointMeta, event: &api::StreamTcpConnect) {
24202466
let parent = self.parent(meta);
24212467
let api::StreamTcpConnect { error, latency } = event;
@@ -3820,6 +3866,35 @@ pub mod builder {
38203866
}
38213867
}
38223868
#[derive(Clone, Debug)]
3869+
pub struct StreamDecryptPacket {
3870+
#[doc = " Did we decrypt the packet in place, or were we able to merge the copy and decrypt?"]
3871+
pub decrypted_in_place: bool,
3872+
#[doc = " The number of bytes we were forced to copy after decrypting in the packet buffer."]
3873+
#[doc = ""]
3874+
#[doc = " This means that the application buffer was insufficiently large to allow us to directly"]
3875+
#[doc = " copy as part of the decrypt. This can be non-zero even with decrypted_in_place=false, if we"]
3876+
#[doc = " decrypted into the reassembly buffer. Right now it doesn't take into account zero-copy"]
3877+
#[doc = " reads from the reassembly buffer (e.g., with specialized Bytes)."]
3878+
pub forced_copy: usize,
3879+
#[doc = " The application buffer size that would avoid copies."]
3880+
pub required_application_buffer: usize,
3881+
}
3882+
impl IntoEvent<api::StreamDecryptPacket> for StreamDecryptPacket {
3883+
#[inline]
3884+
fn into_event(self) -> api::StreamDecryptPacket {
3885+
let StreamDecryptPacket {
3886+
decrypted_in_place,
3887+
forced_copy,
3888+
required_application_buffer,
3889+
} = self;
3890+
api::StreamDecryptPacket {
3891+
decrypted_in_place: decrypted_in_place.into_event(),
3892+
forced_copy: forced_copy.into_event(),
3893+
required_application_buffer: required_application_buffer.into_event(),
3894+
}
3895+
}
3896+
}
3897+
#[derive(Clone, Debug)]
38233898
#[doc = " Tracks stream connect where dcQUIC owns the TCP connect()."]
38243899
pub struct StreamTcpConnect {
38253900
pub error: bool,
@@ -5102,6 +5177,18 @@ mod traits {
51025177
let _ = meta;
51035178
let _ = event;
51045179
}
5180+
#[doc = "Called when the `StreamDecryptPacket` event is triggered"]
5181+
#[inline]
5182+
fn on_stream_decrypt_packet(
5183+
&self,
5184+
context: &Self::ConnectionContext,
5185+
meta: &api::ConnectionMeta,
5186+
event: &api::StreamDecryptPacket,
5187+
) {
5188+
let _ = context;
5189+
let _ = meta;
5190+
let _ = event;
5191+
}
51055192
#[doc = "Called when the `StreamTcpConnect` event is triggered"]
51065193
#[inline]
51075194
fn on_stream_tcp_connect(&self, meta: &api::EndpointMeta, event: &api::StreamTcpConnect) {
@@ -5818,6 +5905,15 @@ mod traits {
58185905
.on_stream_read_socket_errored(context, meta, event);
58195906
}
58205907
#[inline]
5908+
fn on_stream_decrypt_packet(
5909+
&self,
5910+
context: &Self::ConnectionContext,
5911+
meta: &api::ConnectionMeta,
5912+
event: &api::StreamDecryptPacket,
5913+
) {
5914+
self.as_ref().on_stream_decrypt_packet(context, meta, event);
5915+
}
5916+
#[inline]
58215917
fn on_stream_tcp_connect(&self, meta: &api::EndpointMeta, event: &api::StreamTcpConnect) {
58225918
self.as_ref().on_stream_tcp_connect(meta, event);
58235919
}
@@ -6495,6 +6591,16 @@ mod traits {
64956591
(self.1).on_stream_read_socket_errored(&context.1, meta, event);
64966592
}
64976593
#[inline]
6594+
fn on_stream_decrypt_packet(
6595+
&self,
6596+
context: &Self::ConnectionContext,
6597+
meta: &api::ConnectionMeta,
6598+
event: &api::StreamDecryptPacket,
6599+
) {
6600+
(self.0).on_stream_decrypt_packet(&context.0, meta, event);
6601+
(self.1).on_stream_decrypt_packet(&context.1, meta, event);
6602+
}
6603+
#[inline]
64986604
fn on_stream_tcp_connect(&self, meta: &api::EndpointMeta, event: &api::StreamTcpConnect) {
64996605
(self.0).on_stream_tcp_connect(meta, event);
65006606
(self.1).on_stream_tcp_connect(meta, event);
@@ -7448,6 +7554,8 @@ mod traits {
74487554
fn on_stream_read_socket_blocked(&self, event: builder::StreamReadSocketBlocked);
74497555
#[doc = "Publishes a `StreamReadSocketErrored` event to the publisher's subscriber"]
74507556
fn on_stream_read_socket_errored(&self, event: builder::StreamReadSocketErrored);
7557+
#[doc = "Publishes a `StreamDecryptPacket` event to the publisher's subscriber"]
7558+
fn on_stream_decrypt_packet(&self, event: builder::StreamDecryptPacket);
74517559
#[doc = "Publishes a `ConnectionClosed` event to the publisher's subscriber"]
74527560
fn on_connection_closed(&self, event: builder::ConnectionClosed);
74537561
#[doc = r" Returns the QUIC version negotiated for the current connection, if any"]
@@ -7658,6 +7766,15 @@ mod traits {
76587766
self.subscriber.on_event(&self.meta, &event);
76597767
}
76607768
#[inline]
7769+
fn on_stream_decrypt_packet(&self, event: builder::StreamDecryptPacket) {
7770+
let event = event.into_event();
7771+
self.subscriber
7772+
.on_stream_decrypt_packet(self.context, &self.meta, &event);
7773+
self.subscriber
7774+
.on_connection_event(self.context, &self.meta, &event);
7775+
self.subscriber.on_event(&self.meta, &event);
7776+
}
7777+
#[inline]
76617778
fn on_connection_closed(&self, event: builder::ConnectionClosed) {
76627779
let event = event.into_event();
76637780
self.subscriber
@@ -8495,6 +8612,7 @@ pub mod testing {
84958612
pub stream_read_socket_flushed: AtomicU64,
84968613
pub stream_read_socket_blocked: AtomicU64,
84978614
pub stream_read_socket_errored: AtomicU64,
8615+
pub stream_decrypt_packet: AtomicU64,
84988616
pub stream_tcp_connect: AtomicU64,
84998617
pub stream_connect: AtomicU64,
85008618
pub stream_connect_error: AtomicU64,
@@ -8599,6 +8717,7 @@ pub mod testing {
85998717
stream_read_socket_flushed: AtomicU64::new(0),
86008718
stream_read_socket_blocked: AtomicU64::new(0),
86018719
stream_read_socket_errored: AtomicU64::new(0),
8720+
stream_decrypt_packet: AtomicU64::new(0),
86028721
stream_tcp_connect: AtomicU64::new(0),
86038722
stream_connect: AtomicU64::new(0),
86048723
stream_connect_error: AtomicU64::new(0),
@@ -9131,6 +9250,20 @@ pub mod testing {
91319250
self.output.lock().unwrap().push(out);
91329251
}
91339252
}
9253+
fn on_stream_decrypt_packet(
9254+
&self,
9255+
_context: &Self::ConnectionContext,
9256+
meta: &api::ConnectionMeta,
9257+
event: &api::StreamDecryptPacket,
9258+
) {
9259+
self.stream_decrypt_packet.fetch_add(1, Ordering::Relaxed);
9260+
if self.location.is_some() {
9261+
let meta = crate::event::snapshot::Fmt::to_snapshot(meta);
9262+
let event = crate::event::snapshot::Fmt::to_snapshot(event);
9263+
let out = format!("{meta:?} {event:?}");
9264+
self.output.lock().unwrap().push(out);
9265+
}
9266+
}
91349267
fn on_stream_tcp_connect(&self, meta: &api::EndpointMeta, event: &api::StreamTcpConnect) {
91359268
self.stream_tcp_connect.fetch_add(1, Ordering::Relaxed);
91369269
let meta = crate::event::snapshot::Fmt::to_snapshot(meta);
@@ -9589,6 +9722,7 @@ pub mod testing {
95899722
pub stream_read_socket_flushed: AtomicU64,
95909723
pub stream_read_socket_blocked: AtomicU64,
95919724
pub stream_read_socket_errored: AtomicU64,
9725+
pub stream_decrypt_packet: AtomicU64,
95929726
pub stream_tcp_connect: AtomicU64,
95939727
pub stream_connect: AtomicU64,
95949728
pub stream_connect_error: AtomicU64,
@@ -9683,6 +9817,7 @@ pub mod testing {
96839817
stream_read_socket_flushed: AtomicU64::new(0),
96849818
stream_read_socket_blocked: AtomicU64::new(0),
96859819
stream_read_socket_errored: AtomicU64::new(0),
9820+
stream_decrypt_packet: AtomicU64::new(0),
96869821
stream_tcp_connect: AtomicU64::new(0),
96879822
stream_connect: AtomicU64::new(0),
96889823
stream_connect_error: AtomicU64::new(0),
@@ -10356,6 +10491,15 @@ pub mod testing {
1035610491
self.output.lock().unwrap().push(out);
1035710492
}
1035810493
}
10494+
fn on_stream_decrypt_packet(&self, event: builder::StreamDecryptPacket) {
10495+
self.stream_decrypt_packet.fetch_add(1, Ordering::Relaxed);
10496+
let event = event.into_event();
10497+
if self.location.is_some() {
10498+
let event = crate::event::snapshot::Fmt::to_snapshot(&event);
10499+
let out = format!("{event:?}");
10500+
self.output.lock().unwrap().push(out);
10501+
}
10502+
}
1035910503
fn on_connection_closed(&self, event: builder::ConnectionClosed) {
1036010504
self.connection_closed.fetch_add(1, Ordering::Relaxed);
1036110505
let event = event.into_event();

dc/s2n-quic-dc/src/event/generated/metrics.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct Context<R: Recorder> {
4545
stream_read_socket_flushed: AtomicU64,
4646
stream_read_socket_blocked: AtomicU64,
4747
stream_read_socket_errored: AtomicU64,
48+
stream_decrypt_packet: AtomicU64,
4849
connection_closed: AtomicU64,
4950
}
5051
impl<R: Recorder> Context<R> {
@@ -86,6 +87,7 @@ where
8687
stream_read_socket_flushed: AtomicU64::new(0),
8788
stream_read_socket_blocked: AtomicU64::new(0),
8889
stream_read_socket_errored: AtomicU64::new(0),
90+
stream_decrypt_packet: AtomicU64::new(0),
8991
connection_closed: AtomicU64::new(0),
9092
}
9193
}
@@ -323,6 +325,19 @@ where
323325
.on_stream_read_socket_errored(&context.recorder, meta, event);
324326
}
325327
#[inline]
328+
fn on_stream_decrypt_packet(
329+
&self,
330+
context: &Self::ConnectionContext,
331+
meta: &api::ConnectionMeta,
332+
event: &api::StreamDecryptPacket,
333+
) {
334+
context
335+
.stream_decrypt_packet
336+
.fetch_add(1, Ordering::Relaxed);
337+
self.subscriber
338+
.on_stream_decrypt_packet(&context.recorder, meta, event);
339+
}
340+
#[inline]
326341
fn on_connection_closed(
327342
&self,
328343
context: &Self::ConnectionContext,
@@ -412,6 +427,10 @@ impl<R: Recorder> Drop for Context<R> {
412427
"stream_read_socket_errored",
413428
self.stream_read_socket_errored.load(Ordering::Relaxed) as _,
414429
);
430+
self.recorder.increment_counter(
431+
"stream_decrypt_packet",
432+
self.stream_decrypt_packet.load(Ordering::Relaxed) as _,
433+
);
415434
self.recorder.increment_counter(
416435
"connection_closed",
417436
self.connection_closed.load(Ordering::Relaxed) as _,

0 commit comments

Comments
 (0)