@@ -1013,6 +1013,37 @@ pub mod api {
10131013 }
10141014 #[ derive( Clone , Debug ) ]
10151015 #[ non_exhaustive]
1016+ pub struct StreamDecryptPacket {
1017+ #[ doc = " Did we decrypt the packet in place, or were we able to merge the copy and decrypt?" ]
1018+ pub decrypted_in_place : bool ,
1019+ #[ doc = " The number of bytes we were forced to copy after decrypting in the packet buffer." ]
1020+ #[ doc = "" ]
1021+ #[ doc = " This means that the application buffer was insufficiently large to allow us to directly" ]
1022+ #[ doc = " copy as part of the decrypt. This can be non-zero even with decrypted_in_place=false, if we" ]
1023+ #[ doc = " decrypted into the reassembly buffer. Right now it doesn't take into account zero-copy" ]
1024+ #[ doc = " reads from the reassembly buffer (e.g., with specialized Bytes)." ]
1025+ pub forced_copy : usize ,
1026+ #[ doc = " The application buffer size that would avoid copies." ]
1027+ pub required_application_buffer : usize ,
1028+ }
1029+ #[ cfg( any( test, feature = "testing" ) ) ]
1030+ impl crate :: event:: snapshot:: Fmt for StreamDecryptPacket {
1031+ fn fmt ( & self , fmt : & mut core:: fmt:: Formatter ) -> core:: fmt:: Result {
1032+ let mut fmt = fmt. debug_struct ( "StreamDecryptPacket" ) ;
1033+ fmt. field ( "decrypted_in_place" , & self . decrypted_in_place ) ;
1034+ fmt. field ( "forced_copy" , & self . forced_copy ) ;
1035+ fmt. field (
1036+ "required_application_buffer" ,
1037+ & self . required_application_buffer ,
1038+ ) ;
1039+ fmt. finish ( )
1040+ }
1041+ }
1042+ impl Event for StreamDecryptPacket {
1043+ const NAME : & ' static str = "stream:decrypt_packet" ;
1044+ }
1045+ #[ derive( Clone , Debug ) ]
1046+ #[ non_exhaustive]
10161047 #[ doc = " Tracks stream connect where dcQUIC owns the TCP connect()." ]
10171048 pub struct StreamTcpConnect {
10181049 pub error : bool ,
@@ -2416,6 +2447,21 @@ pub mod tracing {
24162447 tracing :: event ! ( target : "stream_read_socket_errored" , parent : id , tracing :: Level :: DEBUG , { capacity = tracing :: field :: debug ( capacity) , errno = tracing :: field :: debug ( errno) } ) ;
24172448 }
24182449 #[ inline]
2450+ fn on_stream_decrypt_packet (
2451+ & self ,
2452+ context : & Self :: ConnectionContext ,
2453+ _meta : & api:: ConnectionMeta ,
2454+ event : & api:: StreamDecryptPacket ,
2455+ ) {
2456+ let id = context. id ( ) ;
2457+ let api:: StreamDecryptPacket {
2458+ decrypted_in_place,
2459+ forced_copy,
2460+ required_application_buffer,
2461+ } = event;
2462+ tracing :: event ! ( target : "stream_decrypt_packet" , parent : id , tracing :: Level :: DEBUG , { decrypted_in_place = tracing :: field :: debug ( decrypted_in_place) , forced_copy = tracing :: field :: debug ( forced_copy) , required_application_buffer = tracing :: field :: debug ( required_application_buffer) } ) ;
2463+ }
2464+ #[ inline]
24192465 fn on_stream_tcp_connect ( & self , meta : & api:: EndpointMeta , event : & api:: StreamTcpConnect ) {
24202466 let parent = self . parent ( meta) ;
24212467 let api:: StreamTcpConnect { error, latency } = event;
@@ -3820,6 +3866,35 @@ pub mod builder {
38203866 }
38213867 }
38223868 #[ derive( Clone , Debug ) ]
3869+ pub struct StreamDecryptPacket {
3870+ #[ doc = " Did we decrypt the packet in place, or were we able to merge the copy and decrypt?" ]
3871+ pub decrypted_in_place : bool ,
3872+ #[ doc = " The number of bytes we were forced to copy after decrypting in the packet buffer." ]
3873+ #[ doc = "" ]
3874+ #[ doc = " This means that the application buffer was insufficiently large to allow us to directly" ]
3875+ #[ doc = " copy as part of the decrypt. This can be non-zero even with decrypted_in_place=false, if we" ]
3876+ #[ doc = " decrypted into the reassembly buffer. Right now it doesn't take into account zero-copy" ]
3877+ #[ doc = " reads from the reassembly buffer (e.g., with specialized Bytes)." ]
3878+ pub forced_copy : usize ,
3879+ #[ doc = " The application buffer size that would avoid copies." ]
3880+ pub required_application_buffer : usize ,
3881+ }
3882+ impl IntoEvent < api:: StreamDecryptPacket > for StreamDecryptPacket {
3883+ #[ inline]
3884+ fn into_event ( self ) -> api:: StreamDecryptPacket {
3885+ let StreamDecryptPacket {
3886+ decrypted_in_place,
3887+ forced_copy,
3888+ required_application_buffer,
3889+ } = self ;
3890+ api:: StreamDecryptPacket {
3891+ decrypted_in_place : decrypted_in_place. into_event ( ) ,
3892+ forced_copy : forced_copy. into_event ( ) ,
3893+ required_application_buffer : required_application_buffer. into_event ( ) ,
3894+ }
3895+ }
3896+ }
3897+ #[ derive( Clone , Debug ) ]
38233898 #[ doc = " Tracks stream connect where dcQUIC owns the TCP connect()." ]
38243899 pub struct StreamTcpConnect {
38253900 pub error : bool ,
@@ -5102,6 +5177,18 @@ mod traits {
51025177 let _ = meta;
51035178 let _ = event;
51045179 }
5180+ #[ doc = "Called when the `StreamDecryptPacket` event is triggered" ]
5181+ #[ inline]
5182+ fn on_stream_decrypt_packet (
5183+ & self ,
5184+ context : & Self :: ConnectionContext ,
5185+ meta : & api:: ConnectionMeta ,
5186+ event : & api:: StreamDecryptPacket ,
5187+ ) {
5188+ let _ = context;
5189+ let _ = meta;
5190+ let _ = event;
5191+ }
51055192 #[ doc = "Called when the `StreamTcpConnect` event is triggered" ]
51065193 #[ inline]
51075194 fn on_stream_tcp_connect ( & self , meta : & api:: EndpointMeta , event : & api:: StreamTcpConnect ) {
@@ -5818,6 +5905,15 @@ mod traits {
58185905 . on_stream_read_socket_errored ( context, meta, event) ;
58195906 }
58205907 #[ inline]
5908+ fn on_stream_decrypt_packet (
5909+ & self ,
5910+ context : & Self :: ConnectionContext ,
5911+ meta : & api:: ConnectionMeta ,
5912+ event : & api:: StreamDecryptPacket ,
5913+ ) {
5914+ self . as_ref ( ) . on_stream_decrypt_packet ( context, meta, event) ;
5915+ }
5916+ #[ inline]
58215917 fn on_stream_tcp_connect ( & self , meta : & api:: EndpointMeta , event : & api:: StreamTcpConnect ) {
58225918 self . as_ref ( ) . on_stream_tcp_connect ( meta, event) ;
58235919 }
@@ -6495,6 +6591,16 @@ mod traits {
64956591 ( self . 1 ) . on_stream_read_socket_errored ( & context. 1 , meta, event) ;
64966592 }
64976593 #[ inline]
6594+ fn on_stream_decrypt_packet (
6595+ & self ,
6596+ context : & Self :: ConnectionContext ,
6597+ meta : & api:: ConnectionMeta ,
6598+ event : & api:: StreamDecryptPacket ,
6599+ ) {
6600+ ( self . 0 ) . on_stream_decrypt_packet ( & context. 0 , meta, event) ;
6601+ ( self . 1 ) . on_stream_decrypt_packet ( & context. 1 , meta, event) ;
6602+ }
6603+ #[ inline]
64986604 fn on_stream_tcp_connect ( & self , meta : & api:: EndpointMeta , event : & api:: StreamTcpConnect ) {
64996605 ( self . 0 ) . on_stream_tcp_connect ( meta, event) ;
65006606 ( self . 1 ) . on_stream_tcp_connect ( meta, event) ;
@@ -7448,6 +7554,8 @@ mod traits {
74487554 fn on_stream_read_socket_blocked ( & self , event : builder:: StreamReadSocketBlocked ) ;
74497555 #[ doc = "Publishes a `StreamReadSocketErrored` event to the publisher's subscriber" ]
74507556 fn on_stream_read_socket_errored ( & self , event : builder:: StreamReadSocketErrored ) ;
7557+ #[ doc = "Publishes a `StreamDecryptPacket` event to the publisher's subscriber" ]
7558+ fn on_stream_decrypt_packet ( & self , event : builder:: StreamDecryptPacket ) ;
74517559 #[ doc = "Publishes a `ConnectionClosed` event to the publisher's subscriber" ]
74527560 fn on_connection_closed ( & self , event : builder:: ConnectionClosed ) ;
74537561 #[ doc = r" Returns the QUIC version negotiated for the current connection, if any" ]
@@ -7658,6 +7766,15 @@ mod traits {
76587766 self . subscriber . on_event ( & self . meta , & event) ;
76597767 }
76607768 #[ inline]
7769+ fn on_stream_decrypt_packet ( & self , event : builder:: StreamDecryptPacket ) {
7770+ let event = event. into_event ( ) ;
7771+ self . subscriber
7772+ . on_stream_decrypt_packet ( self . context , & self . meta , & event) ;
7773+ self . subscriber
7774+ . on_connection_event ( self . context , & self . meta , & event) ;
7775+ self . subscriber . on_event ( & self . meta , & event) ;
7776+ }
7777+ #[ inline]
76617778 fn on_connection_closed ( & self , event : builder:: ConnectionClosed ) {
76627779 let event = event. into_event ( ) ;
76637780 self . subscriber
@@ -8495,6 +8612,7 @@ pub mod testing {
84958612 pub stream_read_socket_flushed : AtomicU64 ,
84968613 pub stream_read_socket_blocked : AtomicU64 ,
84978614 pub stream_read_socket_errored : AtomicU64 ,
8615+ pub stream_decrypt_packet : AtomicU64 ,
84988616 pub stream_tcp_connect : AtomicU64 ,
84998617 pub stream_connect : AtomicU64 ,
85008618 pub stream_connect_error : AtomicU64 ,
@@ -8599,6 +8717,7 @@ pub mod testing {
85998717 stream_read_socket_flushed : AtomicU64 :: new ( 0 ) ,
86008718 stream_read_socket_blocked : AtomicU64 :: new ( 0 ) ,
86018719 stream_read_socket_errored : AtomicU64 :: new ( 0 ) ,
8720+ stream_decrypt_packet : AtomicU64 :: new ( 0 ) ,
86028721 stream_tcp_connect : AtomicU64 :: new ( 0 ) ,
86038722 stream_connect : AtomicU64 :: new ( 0 ) ,
86048723 stream_connect_error : AtomicU64 :: new ( 0 ) ,
@@ -9131,6 +9250,20 @@ pub mod testing {
91319250 self . output . lock ( ) . unwrap ( ) . push ( out) ;
91329251 }
91339252 }
9253+ fn on_stream_decrypt_packet (
9254+ & self ,
9255+ _context : & Self :: ConnectionContext ,
9256+ meta : & api:: ConnectionMeta ,
9257+ event : & api:: StreamDecryptPacket ,
9258+ ) {
9259+ self . stream_decrypt_packet . fetch_add ( 1 , Ordering :: Relaxed ) ;
9260+ if self . location . is_some ( ) {
9261+ let meta = crate :: event:: snapshot:: Fmt :: to_snapshot ( meta) ;
9262+ let event = crate :: event:: snapshot:: Fmt :: to_snapshot ( event) ;
9263+ let out = format ! ( "{meta:?} {event:?}" ) ;
9264+ self . output . lock ( ) . unwrap ( ) . push ( out) ;
9265+ }
9266+ }
91349267 fn on_stream_tcp_connect ( & self , meta : & api:: EndpointMeta , event : & api:: StreamTcpConnect ) {
91359268 self . stream_tcp_connect . fetch_add ( 1 , Ordering :: Relaxed ) ;
91369269 let meta = crate :: event:: snapshot:: Fmt :: to_snapshot ( meta) ;
@@ -9589,6 +9722,7 @@ pub mod testing {
95899722 pub stream_read_socket_flushed : AtomicU64 ,
95909723 pub stream_read_socket_blocked : AtomicU64 ,
95919724 pub stream_read_socket_errored : AtomicU64 ,
9725+ pub stream_decrypt_packet : AtomicU64 ,
95929726 pub stream_tcp_connect : AtomicU64 ,
95939727 pub stream_connect : AtomicU64 ,
95949728 pub stream_connect_error : AtomicU64 ,
@@ -9683,6 +9817,7 @@ pub mod testing {
96839817 stream_read_socket_flushed : AtomicU64 :: new ( 0 ) ,
96849818 stream_read_socket_blocked : AtomicU64 :: new ( 0 ) ,
96859819 stream_read_socket_errored : AtomicU64 :: new ( 0 ) ,
9820+ stream_decrypt_packet : AtomicU64 :: new ( 0 ) ,
96869821 stream_tcp_connect : AtomicU64 :: new ( 0 ) ,
96879822 stream_connect : AtomicU64 :: new ( 0 ) ,
96889823 stream_connect_error : AtomicU64 :: new ( 0 ) ,
@@ -10356,6 +10491,15 @@ pub mod testing {
1035610491 self . output . lock ( ) . unwrap ( ) . push ( out) ;
1035710492 }
1035810493 }
10494+ fn on_stream_decrypt_packet ( & self , event : builder:: StreamDecryptPacket ) {
10495+ self . stream_decrypt_packet . fetch_add ( 1 , Ordering :: Relaxed ) ;
10496+ let event = event. into_event ( ) ;
10497+ if self . location . is_some ( ) {
10498+ let event = crate :: event:: snapshot:: Fmt :: to_snapshot ( & event) ;
10499+ let out = format ! ( "{event:?}" ) ;
10500+ self . output . lock ( ) . unwrap ( ) . push ( out) ;
10501+ }
10502+ }
1035910503 fn on_connection_closed ( & self , event : builder:: ConnectionClosed ) {
1036010504 self . connection_closed . fetch_add ( 1 , Ordering :: Relaxed ) ;
1036110505 let event = event. into_event ( ) ;
0 commit comments