@@ -2,21 +2,16 @@ use std::{thread::sleep, time::Duration};
22
33use log:: { debug, info, warn} ;
44
5- #[ allow( unused_imports) ]
6- use neli:: {
7- consts:: socket:: { Msg , NlFamily } ,
8- router:: synchronous:: NlRouter ,
9- socket:: NlSocket ,
10- utils:: Groups ,
11- } ;
5+ use netlink_sys:: { Socket , SocketAddr , protocols:: NETLINK_GENERIC } ;
126use tokio:: sync:: mpsc:: Sender ;
137
148use std:: io;
159
1610use super :: super :: message:: netlink:: NetlinkCommand ;
11+ use super :: netlink_utils;
1712
1813#[ cfg( not( test) ) ]
19- type SocketType = NlSocket ;
14+ type SocketType = Socket ;
2015#[ cfg( test) ]
2116type SocketType = test:: MockSocket ;
2217
@@ -42,6 +37,8 @@ const CTRL_CMD_DELFAMILY: u8 = 2;
4237const CTRL_ATTR_FAMILY_NAME : u16 = 2 ;
4338/// Size of generic netlink header in bytes
4439const GENL_HEADER_SIZE : usize = 20 ;
40+ /// Netlink control notify multicast group ID
41+ const NLCTRL_NOTIFY_GROUP_ID : u32 = 1 ;
4542
4643/// Actor responsible for monitoring netlink family registration/unregistration.
4744///
@@ -58,9 +55,9 @@ pub struct ControlNetlinkActor {
5855 command_sender : Sender < NetlinkCommand > ,
5956 /// Last time we checked if the family exists
6057 last_family_check : std:: time:: Instant ,
61- /// Reusable netlink resolver for family existence checks
58+ /// Reusable netlink socket for family existence checks
6259 #[ cfg( not( test) ) ]
63- resolver : Option < NlRouter > ,
60+ resolver : Option < Socket > ,
6461 #[ cfg( test) ]
6562 #[ allow( dead_code) ]
6663 resolver : Option < ( ) > ,
@@ -99,42 +96,37 @@ impl ControlNetlinkActor {
9996 actor
10097 }
10198
102- /// Establishes a connection to the netlink control socket (legacy interface) .
99+ /// Establishes a connection to the netlink control socket.
103100 #[ cfg( not( test) ) ]
104101 fn connect_control_socket ( ) -> Option < SocketType > {
105- // Create a router to resolve the control group
106- let ( router , _ ) = match NlRouter :: connect ( NlFamily :: Generic , Some ( 0 ) , Groups :: empty ( ) ) {
107- Ok ( result ) => result ,
102+ // Create a raw netlink socket
103+ let mut socket = match Socket :: new ( NETLINK_GENERIC ) {
104+ Ok ( s ) => s ,
108105 Err ( e) => {
109- warn ! ( "Failed to connect control router : {:?}" , e) ;
106+ warn ! ( "Failed to create netlink socket : {:?}" , e) ;
110107 return None ;
111108 }
112109 } ;
113110
114- // Resolve the "notify" multicast group for nlctrl family
115- let notify_group_id = match router. resolve_nl_mcast_group ( "nlctrl" , "notify" ) {
116- Ok ( group_id) => {
117- debug ! ( "Resolved nlctrl notify group ID: {}" , group_id) ;
118- group_id
119- }
120- Err ( e) => {
121- warn ! ( "Failed to resolve nlctrl notify group: {:?}" , e) ;
122- return None ;
123- }
124- } ;
111+ // Bind the socket with automatic port assignment
112+ let addr = SocketAddr :: new ( 0 , 0 ) ;
113+ if let Err ( e) = socket. bind ( & addr) {
114+ warn ! ( "Failed to bind control socket: {:?}" , e) ;
115+ return None ;
116+ }
125117
126- // Connect to NETLINK_GENERIC with the notify group
127- let socket = match SocketType :: connect (
128- NlFamily :: Generic ,
129- Some ( 0 ) ,
130- Groups :: new_groups ( & [ notify_group_id ] ) ,
131- ) {
132- Ok ( socket ) => socket ,
133- Err ( e ) => {
134- warn ! ( "Failed to connect control socket: {:?}" , e ) ;
135- return None ;
136- }
137- } ;
118+ // Subscribe to nlctrl notify group (group ID 1 for nlctrl notify)
119+ // The nlctrl family uses a well-known multicast group ID
120+ if let Err ( e ) = socket . add_membership ( NLCTRL_NOTIFY_GROUP_ID ) {
121+ warn ! ( "Failed to add multicast membership: {:?}" , e ) ;
122+ return None ;
123+ }
124+
125+ // Set non-blocking mode
126+ if let Err ( e ) = socket. set_non_blocking ( true ) {
127+ warn ! ( "Failed to set non-blocking mode: {:?}" , e ) ;
128+ return None ;
129+ }
138130
139131 debug ! ( "Successfully connected control socket and subscribed to nlctrl notifications" ) ;
140132 Some ( socket)
@@ -147,30 +139,17 @@ impl ControlNetlinkActor {
147139 None
148140 }
149141
150- /// Creates a netlink resolver for family/group resolution.
151- ///
152- /// # Returns
153- ///
154- /// Some(router) if creation is successful, None otherwise
142+ /// Creates a netlink socket for family/group resolution.
143+ /// Now delegates to netlink_utils module.
155144 #[ cfg( not( test) ) ]
156- fn create_nl_resolver ( ) -> Option < NlRouter > {
157- match NlRouter :: connect ( NlFamily :: Generic , Some ( 0 ) , Groups :: empty ( ) ) {
158- Ok ( ( router, _) ) => {
159- debug ! ( "Created netlink resolver for family/group resolution" ) ;
160- Some ( router)
161- }
162- Err ( e) => {
163- warn ! ( "Failed to create netlink resolver: {:?}" , e) ;
164- None
165- }
166- }
145+ fn create_nl_resolver ( ) -> Option < Socket > {
146+ netlink_utils:: create_nl_resolver ( )
167147 }
168148
169149 /// Mock netlink resolver for testing.
170150 #[ cfg( test) ]
171151 #[ allow( dead_code) ]
172- fn create_nl_resolver ( ) -> Option < NlRouter > {
173- // Return None for tests to avoid complexity
152+ fn create_nl_resolver ( ) -> Option < ( ) > {
174153 None
175154 }
176155
@@ -194,18 +173,18 @@ impl ControlNetlinkActor {
194173 }
195174 }
196175
197- if let Some ( ref resolver) = self . resolver {
198- match resolver . resolve_genl_family ( & self . family ) {
199- Ok ( family_info ) => {
200- debug ! ( "Family '{}' exists with ID: {}" , self . family, family_info ) ;
176+ if let Some ( ref mut resolver) = self . resolver {
177+ match netlink_utils :: resolve_family_id ( resolver , & self . family ) {
178+ Ok ( family_id ) => {
179+ debug ! ( "Family '{}' exists with ID: {}" , self . family, family_id ) ;
201180 true
202181 }
203182 Err ( e) => {
204183 debug ! ( "Family '{}' resolution failed: {:?}" , self . family, e) ;
205184 // Only clear resolver on specific errors that indicate it's stale
206- // For "family not found" errors, keep the resolver as it's still valid
207- if e . to_string ( ) . contains ( "No such file or directory" )
208- || e . to_string ( ) . contains ( "Connection refused" )
185+ let err_str = format ! ( "{:?}" , e ) ;
186+ if err_str . contains ( "No such file or directory" )
187+ || err_str . contains ( "Connection refused" )
209188 {
210189 debug ! ( "Clearing resolver due to connection error" ) ;
211190 self . resolver = None ;
@@ -233,13 +212,14 @@ impl ControlNetlinkActor {
233212 socket : Option < & mut SocketType > ,
234213 target_family : & str ,
235214 ) -> Result < bool , io:: Error > {
215+ debug ! ( "Attempting to receive control message" ) ;
236216 let socket = socket. ok_or_else ( || {
237217 io:: Error :: new ( io:: ErrorKind :: NotConnected , "No control socket available" )
238218 } ) ?;
239219
240220 let mut buffer = vec ! [ 0 ; BUFFER_SIZE ] ;
241- match socket. recv ( & mut buffer, Msg :: DONTWAIT ) {
242- Ok ( ( size, _ ) ) => {
221+ match socket. recv_from ( & mut buffer, 0 ) {
222+ Ok ( ( size, _addr ) ) => {
243223 if size == 0 {
244224 return Ok ( false ) ;
245225 }
@@ -410,8 +390,9 @@ impl ControlNetlinkActor {
410390 // Log heartbeat every minute to show the actor is running
411391 if heartbeat_counter % HEARTBEAT_LOG_INTERVAL == 0 {
412392 info ! (
413- "ControlNetlinkActor is running normally - monitoring family '{}'" ,
414- actor. family
393+ "ControlNetlinkActor is running normally - monitoring family '{}', family_was_available={}" ,
394+ actor. family,
395+ family_was_available,
415396 ) ;
416397 }
417398
@@ -470,16 +451,16 @@ impl ControlNetlinkActor {
470451 family_was_available = family_available;
471452 } else if family_available {
472453 // Family is available but we haven't sent a reconnect recently
473- // Send periodic reconnect commands to ensure DataNetlinkActor stays connected
454+ // Send periodic soft reconnect commands to ensure DataNetlinkActor stays connected
474455 // This handles cases where DataNetlinkActor disconnected due to socket errors
475- // Since DataNetlinkActor.connect() now skips unnecessary reconnects, we can be more conservative
456+ // SoftReconnect only reconnects if socket is unhealthy, avoiding unnecessary reconnections
476457 if heartbeat_counter - last_periodic_reconnect_counter
477458 >= PERIODIC_RECONNECT_INTERVAL
478459 {
479- debug ! ( "Sending periodic reconnect command to ensure data socket stays connected (counter: {}, last: {}, interval: {})" ,
460+ debug ! ( "Sending periodic soft reconnect command to check data socket health (counter: {}, last: {}, interval: {})" ,
480461 heartbeat_counter, last_periodic_reconnect_counter, PERIODIC_RECONNECT_INTERVAL ) ;
481- if let Err ( e) = actor. command_sender . send ( NetlinkCommand :: Reconnect ) . await {
482- warn ! ( "Failed to send periodic reconnect command: {:?}" , e) ;
462+ if let Err ( e) = actor. command_sender . send ( NetlinkCommand :: SoftReconnect ) . await {
463+ warn ! ( "Failed to send periodic soft reconnect command: {:?}" , e) ;
483464 break ; // Channel is closed, exit
484465 }
485466 last_periodic_reconnect_counter = heartbeat_counter;
@@ -512,7 +493,7 @@ pub mod test {
512493 pub struct MockSocket ;
513494
514495 impl MockSocket {
515- pub fn recv ( & mut self , _buf : & mut [ u8 ] , _flags : Msg ) -> Result < ( usize , Groups ) , io:: Error > {
496+ pub fn recv_from ( & mut self , _buf : & mut [ u8 ] , _flags : i32 ) -> Result < ( usize , SocketAddr ) , io:: Error > {
516497 // Always return WouldBlock to simulate no control messages
517498 Err ( io:: Error :: new (
518499 io:: ErrorKind :: WouldBlock ,
0 commit comments