@@ -253,7 +253,7 @@ unsafe extern "C" fn observed_poll(fds: *mut libc::pollfd, nfds: u64, timeout: c
253253 } ) ;
254254 } else if ( * fds) . revents & 4 == 4 {
255255 // requested events contains writing
256- SOCKET_READ_TIME_PROFILING_STATS . with ( |cell| {
256+ SOCKET_WRITE_TIME_PROFILING_STATS . with ( |cell| {
257257 let mut io = cell. borrow_mut ( ) ;
258258 io. track ( duration. as_nanos ( ) as u64 )
259259 } ) ;
@@ -265,7 +265,7 @@ unsafe extern "C" fn observed_poll(fds: *mut libc::pollfd, nfds: u64, timeout: c
265265 } ) ;
266266 } else if ( * fds) . events & 4 == 4 {
267267 // socket became writeable
268- SOCKET_READ_TIME_PROFILING_STATS . with ( |cell| {
268+ SOCKET_WRITE_TIME_PROFILING_STATS . with ( |cell| {
269269 let mut io = cell. borrow_mut ( ) ;
270270 io. track ( duration. as_nanos ( ) as u64 )
271271 } ) ;
@@ -327,6 +327,41 @@ unsafe extern "C" fn observed_recvmsg(
327327 len
328328}
329329
330+ static mut ORIG_RECVFROM : unsafe extern "C" fn (
331+ c_int ,
332+ * mut c_void ,
333+ usize ,
334+ c_int ,
335+ * mut libc:: sockaddr ,
336+ * mut libc:: socklen_t ,
337+ ) -> isize = libc:: recvfrom;
338+
339+ unsafe extern "C" fn observed_recvfrom (
340+ socket : c_int ,
341+ buf : * mut c_void ,
342+ length : usize ,
343+ flags : c_int ,
344+ address : * mut libc:: sockaddr ,
345+ address_len : * mut libc:: socklen_t ,
346+ ) -> isize {
347+ let start = Instant :: now ( ) ;
348+ let len = ORIG_RECVFROM ( socket, buf, length, flags, address, address_len) ;
349+ let duration = start. elapsed ( ) ;
350+
351+ SOCKET_READ_TIME_PROFILING_STATS . with ( |cell| {
352+ let mut io = cell. borrow_mut ( ) ;
353+ io. track ( duration. as_nanos ( ) as u64 )
354+ } ) ;
355+ if len > 0 {
356+ SOCKET_READ_SIZE_PROFILING_STATS . with ( |cell| {
357+ let mut io = cell. borrow_mut ( ) ;
358+ io. track ( len as u64 )
359+ } ) ;
360+ }
361+
362+ len
363+ }
364+
330365static mut ORIG_SEND : unsafe extern "C" fn ( c_int , * const c_void , usize , c_int ) -> isize =
331366 libc:: send;
332367unsafe extern "C" fn observed_send (
@@ -414,17 +449,23 @@ unsafe extern "C" fn observed_write(fd: c_int, buf: *const c_void, count: usize)
414449 let len = ORIG_WRITE ( fd, buf, count) ;
415450 let duration = start. elapsed ( ) ;
416451
417- FILE_WRITE_TIME_PROFILING_STATS . with ( |cell| {
418- let mut io = cell . borrow_mut ( ) ;
419- io . track ( duration . as_nanos ( ) as u64 )
420- } ) ;
421- if len > 0 {
422- if fd_is_socket ( fd ) {
452+ if fd_is_socket ( fd ) {
453+ SOCKET_WRITE_TIME_PROFILING_STATS . with ( |cell| {
454+ let mut io = cell . borrow_mut ( ) ;
455+ io . track ( duration . as_nanos ( ) as u64 )
456+ } ) ;
457+ if len > 0 {
423458 SOCKET_WRITE_SIZE_PROFILING_STATS . with ( |cell| {
424459 let mut io = cell. borrow_mut ( ) ;
425460 io. track ( len as u64 )
426461 } ) ;
427- } else {
462+ }
463+ } else {
464+ FILE_WRITE_TIME_PROFILING_STATS . with ( |cell| {
465+ let mut io = cell. borrow_mut ( ) ;
466+ io. track ( duration. as_nanos ( ) as u64 )
467+ } ) ;
468+ if len > 0 {
428469 FILE_WRITE_SIZE_PROFILING_STATS . with ( |cell| {
429470 let mut io = cell. borrow_mut ( ) ;
430471 io. track ( len as u64 )
@@ -454,12 +495,10 @@ unsafe extern "C" fn observed_fread(
454495 let mut io = cell. borrow_mut ( ) ;
455496 io. track ( duration. as_nanos ( ) as u64 )
456497 } ) ;
457- if len > 0 {
458- FILE_READ_SIZE_PROFILING_STATS . with ( |cell| {
459- let mut io = cell. borrow_mut ( ) ;
460- io. track ( len as u64 )
461- } ) ;
462- }
498+ FILE_READ_SIZE_PROFILING_STATS . with ( |cell| {
499+ let mut io = cell. borrow_mut ( ) ;
500+ io. track ( len as u64 )
501+ } ) ;
463502
464503 len
465504}
@@ -470,17 +509,23 @@ unsafe extern "C" fn observed_read(fd: c_int, buf: *mut c_void, count: usize) ->
470509 let len = ORIG_READ ( fd, buf, count) ;
471510 let duration = start. elapsed ( ) ;
472511
473- FILE_READ_TIME_PROFILING_STATS . with ( |cell| {
474- let mut io = cell . borrow_mut ( ) ;
475- io . track ( duration . as_nanos ( ) as u64 )
476- } ) ;
477- if len > 0 {
478- if fd_is_socket ( fd ) {
512+ if fd_is_socket ( fd ) {
513+ SOCKET_READ_TIME_PROFILING_STATS . with ( |cell| {
514+ let mut io = cell . borrow_mut ( ) ;
515+ io . track ( duration . as_nanos ( ) as u64 )
516+ } ) ;
517+ if len > 0 {
479518 SOCKET_READ_SIZE_PROFILING_STATS . with ( |cell| {
480519 let mut io = cell. borrow_mut ( ) ;
481520 io. track ( len as u64 )
482521 } ) ;
483- } else {
522+ }
523+ } else {
524+ FILE_READ_TIME_PROFILING_STATS . with ( |cell| {
525+ let mut io = cell. borrow_mut ( ) ;
526+ io. track ( duration. as_nanos ( ) as u64 )
527+ } ) ;
528+ if len > 0 {
484529 FILE_READ_SIZE_PROFILING_STATS . with ( |cell| {
485530 let mut io = cell. borrow_mut ( ) ;
486531 io. track ( len as u64 )
@@ -539,19 +584,10 @@ pub static FILE_READ_TIME_PROFILING_INTERVAL: AtomicU64 =
539584 AtomicU64 :: new ( std:: time:: Duration :: from_millis ( 10 ) . as_nanos ( ) as u64 ) ;
540585pub static FILE_WRITE_TIME_PROFILING_INTERVAL : AtomicU64 =
541586 AtomicU64 :: new ( std:: time:: Duration :: from_millis ( 10 ) . as_nanos ( ) as u64 ) ;
542- pub static SOCKET_READ_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 10 ) ;
543- pub static SOCKET_WRITE_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 10 ) ;
544- pub static FILE_READ_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 10 ) ;
545- pub static FILE_WRITE_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 10 ) ;
546-
547- pub static SOCKET_READ_TIME_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
548- pub static SOCKET_WRITE_TIME_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
549- pub static FILE_READ_TIME_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
550- pub static FILE_WRITE_TIME_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
551- pub static SOCKET_READ_SIZE_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
552- pub static SOCKET_WRITE_SIZE_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
553- pub static FILE_READ_SIZE_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
554- pub static FILE_WRITE_SIZE_SAMPLE_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
587+ pub static SOCKET_READ_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 100 ) ;
588+ pub static SOCKET_WRITE_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 100 ) ;
589+ pub static FILE_READ_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 100 ) ;
590+ pub static FILE_WRITE_SIZE_PROFILING_INTERVAL : AtomicU64 = AtomicU64 :: new ( 1024 * 100 ) ;
555591
556592pub trait IOCollector {
557593 fn collect ( & self , profiler : & Profiler , value : u64 ) ;
@@ -560,7 +596,6 @@ pub trait IOCollector {
560596pub struct SocketReadTimeCollector ;
561597impl IOCollector for SocketReadTimeCollector {
562598 fn collect ( & self , profiler : & Profiler , value : u64 ) {
563- SOCKET_READ_TIME_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
564599 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
565600 unsafe {
566601 profiler. collect_socket_read_time (
@@ -574,7 +609,6 @@ impl IOCollector for SocketReadTimeCollector {
574609pub struct SocketWriteTimeCollector ;
575610impl IOCollector for SocketWriteTimeCollector {
576611 fn collect ( & self , profiler : & Profiler , value : u64 ) {
577- SOCKET_WRITE_TIME_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
578612 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
579613 unsafe {
580614 profiler. collect_socket_write_time (
@@ -588,7 +622,6 @@ impl IOCollector for SocketWriteTimeCollector {
588622pub struct FileReadTimeCollector ;
589623impl IOCollector for FileReadTimeCollector {
590624 fn collect ( & self , profiler : & Profiler , value : u64 ) {
591- FILE_READ_TIME_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
592625 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
593626 unsafe {
594627 profiler. collect_file_read_time (
@@ -602,7 +635,6 @@ impl IOCollector for FileReadTimeCollector {
602635pub struct FileWriteTimeCollector ;
603636impl IOCollector for FileWriteTimeCollector {
604637 fn collect ( & self , profiler : & Profiler , value : u64 ) {
605- FILE_WRITE_TIME_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
606638 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
607639 unsafe {
608640 profiler. collect_file_write_time (
@@ -616,7 +648,6 @@ impl IOCollector for FileWriteTimeCollector {
616648pub struct SocketReadSizeCollector ;
617649impl IOCollector for SocketReadSizeCollector {
618650 fn collect ( & self , profiler : & Profiler , value : u64 ) {
619- SOCKET_READ_SIZE_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
620651 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
621652 unsafe {
622653 profiler. collect_socket_read_size (
@@ -630,7 +661,6 @@ impl IOCollector for SocketReadSizeCollector {
630661pub struct SocketWriteSizeCollector ;
631662impl IOCollector for SocketWriteSizeCollector {
632663 fn collect ( & self , profiler : & Profiler , value : u64 ) {
633- SOCKET_WRITE_SIZE_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
634664 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
635665 unsafe {
636666 profiler. collect_socket_write_size (
@@ -644,7 +674,6 @@ impl IOCollector for SocketWriteSizeCollector {
644674pub struct FileReadSizeCollector ;
645675impl IOCollector for FileReadSizeCollector {
646676 fn collect ( & self , profiler : & Profiler , value : u64 ) {
647- FILE_READ_SIZE_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
648677 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
649678 unsafe {
650679 profiler. collect_file_read_size (
@@ -658,7 +687,6 @@ impl IOCollector for FileReadSizeCollector {
658687pub struct FileWriteSizeCollector ;
659688impl IOCollector for FileWriteSizeCollector {
660689 fn collect ( & self , profiler : & Profiler , value : u64 ) {
661- FILE_WRITE_SIZE_SAMPLE_COUNT . fetch_add ( 1 , Ordering :: SeqCst ) ;
662690 // Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
663691 unsafe {
664692 profiler. collect_file_write_size (
@@ -788,6 +816,11 @@ pub fn io_prof_first_rinit() {
788816 new_func: observed_recvmsg as * mut ( ) ,
789817 orig_func: ptr:: addr_of_mut!( ORIG_RECVMSG ) as * mut _ as * mut * mut ( ) ,
790818 } ,
819+ GotSymbolOverwrite {
820+ symbol_name: "recvfrom" ,
821+ new_func: observed_recvfrom as * mut ( ) ,
822+ orig_func: ptr:: addr_of_mut!( ORIG_RECVFROM ) as * mut _ as * mut * mut ( ) ,
823+ } ,
791824 GotSymbolOverwrite {
792825 symbol_name: "send" ,
793826 new_func: observed_send as * mut ( ) ,
0 commit comments