Skip to content

Commit c330c45

Browse files
authored
[countersyncd]: Fix netlink fd leakage and deadlock issue (sonic-net#4043)
This PR replaces the neli netlink library with genetlink and netlink-* libraries to fix deadlock and file descriptor leakage issues in the countersyncd component. The changes involve significant refactoring of socket management, introduction of a shared utilities module, and updates to the reconnection strategy.
1 parent ff58550 commit c330c45

File tree

8 files changed

+768
-341
lines changed

8 files changed

+768
-341
lines changed

Cargo.lock

Lines changed: 134 additions & 114 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ tokio = { version = "1.37", features = ["full"] }
2626
tokio-util = { version = "0.7", features = ["rt"] }
2727
tokio-stream = "0.1"
2828

29-
# Netlink for network operations
30-
neli = { git = "https://github.com/jbaublitz/neli.git", tag = "neli-v0.7.0-rc2" }
29+
# Using genetlink and netlink-packet-generic for netlink support
30+
genetlink = "0.2"
31+
netlink-packet-core = "0.7"
32+
netlink-packet-generic = "0.3"
33+
netlink-sys = "0.8"
3134

3235
# IPFIX parser for traffic flow analysis
3336
ipfixrw = "0.1.0"
3437
ahash = "0.8.11"
35-
binrw = "0.14.1"
38+
binrw = "0.15.0"
3639
byteorder = "1.5.0"
3740

3841
# Configuration and serialization
@@ -75,4 +78,4 @@ pretty_assertions = "1"
7578

7679
# Build dependencies
7780
tonic-build = "0.12"
78-
vergen = { version = "8.2", features = ["build", "git", "gitoxide", "cargo", "rustc", "si"] }
81+
vergen = { version = "8.2", features = ["build", "git", "gitoxide", "cargo", "rustc", "si"] }

crates/countersyncd/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ tokio = { workspace = true }
1616
yaml-rust = { workspace = true }
1717

1818
# Netlink for network operations
19-
neli = { workspace = true }
19+
genetlink = { workspace = true }
20+
netlink-packet-core = { workspace = true }
21+
netlink-packet-generic = { workspace = true }
22+
netlink-sys = { workspace = true }
23+
libc = "0.2"
2024

2125
# IPFIX parser for traffic flow analysis
2226
ipfixrw = { workspace = true }

crates/countersyncd/src/actor/control_netlink.rs

Lines changed: 54 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@ use std::{thread::sleep, time::Duration};
22

33
use log::{debug, info, warn};
44

5-
#[allow(unused_imports)]
6-
use neli::{
7-
consts::socket::{Msg, NlFamily},
8-
router::synchronous::NlRouter,
9-
socket::NlSocket,
10-
utils::Groups,
11-
};
5+
use netlink_sys::{Socket, SocketAddr, protocols::NETLINK_GENERIC};
126
use tokio::sync::mpsc::Sender;
137

148
use std::io;
159

1610
use super::super::message::netlink::NetlinkCommand;
11+
use super::netlink_utils;
1712

1813
#[cfg(not(test))]
19-
type SocketType = NlSocket;
14+
type SocketType = Socket;
2015
#[cfg(test)]
2116
type SocketType = test::MockSocket;
2217

@@ -42,6 +37,8 @@ const CTRL_CMD_DELFAMILY: u8 = 2;
4237
const CTRL_ATTR_FAMILY_NAME: u16 = 2;
4338
/// Size of generic netlink header in bytes
4439
const GENL_HEADER_SIZE: usize = 20;
40+
/// Netlink control notify multicast group ID
41+
const NLCTRL_NOTIFY_GROUP_ID: u32 = 1;
4542

4643
/// Actor responsible for monitoring netlink family registration/unregistration.
4744
///
@@ -58,9 +55,9 @@ pub struct ControlNetlinkActor {
5855
command_sender: Sender<NetlinkCommand>,
5956
/// Last time we checked if the family exists
6057
last_family_check: std::time::Instant,
61-
/// Reusable netlink resolver for family existence checks
58+
/// Reusable netlink socket for family existence checks
6259
#[cfg(not(test))]
63-
resolver: Option<NlRouter>,
60+
resolver: Option<Socket>,
6461
#[cfg(test)]
6562
#[allow(dead_code)]
6663
resolver: Option<()>,
@@ -99,42 +96,37 @@ impl ControlNetlinkActor {
9996
actor
10097
}
10198

102-
/// Establishes a connection to the netlink control socket (legacy interface).
99+
/// Establishes a connection to the netlink control socket.
103100
#[cfg(not(test))]
104101
fn connect_control_socket() -> Option<SocketType> {
105-
// Create a router to resolve the control group
106-
let (router, _) = match NlRouter::connect(NlFamily::Generic, Some(0), Groups::empty()) {
107-
Ok(result) => result,
102+
// Create a raw netlink socket
103+
let mut socket = match Socket::new(NETLINK_GENERIC) {
104+
Ok(s) => s,
108105
Err(e) => {
109-
warn!("Failed to connect control router: {:?}", e);
106+
warn!("Failed to create netlink socket: {:?}", e);
110107
return None;
111108
}
112109
};
113110

114-
// Resolve the "notify" multicast group for nlctrl family
115-
let notify_group_id = match router.resolve_nl_mcast_group("nlctrl", "notify") {
116-
Ok(group_id) => {
117-
debug!("Resolved nlctrl notify group ID: {}", group_id);
118-
group_id
119-
}
120-
Err(e) => {
121-
warn!("Failed to resolve nlctrl notify group: {:?}", e);
122-
return None;
123-
}
124-
};
111+
// Bind the socket with automatic port assignment
112+
let addr = SocketAddr::new(0, 0);
113+
if let Err(e) = socket.bind(&addr) {
114+
warn!("Failed to bind control socket: {:?}", e);
115+
return None;
116+
}
125117

126-
// Connect to NETLINK_GENERIC with the notify group
127-
let socket = match SocketType::connect(
128-
NlFamily::Generic,
129-
Some(0),
130-
Groups::new_groups(&[notify_group_id]),
131-
) {
132-
Ok(socket) => socket,
133-
Err(e) => {
134-
warn!("Failed to connect control socket: {:?}", e);
135-
return None;
136-
}
137-
};
118+
// Subscribe to nlctrl notify group (group ID 1 for nlctrl notify)
119+
// The nlctrl family uses a well-known multicast group ID
120+
if let Err(e) = socket.add_membership(NLCTRL_NOTIFY_GROUP_ID) {
121+
warn!("Failed to add multicast membership: {:?}", e);
122+
return None;
123+
}
124+
125+
// Set non-blocking mode
126+
if let Err(e) = socket.set_non_blocking(true) {
127+
warn!("Failed to set non-blocking mode: {:?}", e);
128+
return None;
129+
}
138130

139131
debug!("Successfully connected control socket and subscribed to nlctrl notifications");
140132
Some(socket)
@@ -147,30 +139,17 @@ impl ControlNetlinkActor {
147139
None
148140
}
149141

150-
/// Creates a netlink resolver for family/group resolution.
151-
///
152-
/// # Returns
153-
///
154-
/// Some(router) if creation is successful, None otherwise
142+
/// Creates a netlink socket for family/group resolution.
143+
/// Now delegates to netlink_utils module.
155144
#[cfg(not(test))]
156-
fn create_nl_resolver() -> Option<NlRouter> {
157-
match NlRouter::connect(NlFamily::Generic, Some(0), Groups::empty()) {
158-
Ok((router, _)) => {
159-
debug!("Created netlink resolver for family/group resolution");
160-
Some(router)
161-
}
162-
Err(e) => {
163-
warn!("Failed to create netlink resolver: {:?}", e);
164-
None
165-
}
166-
}
145+
fn create_nl_resolver() -> Option<Socket> {
146+
netlink_utils::create_nl_resolver()
167147
}
168148

169149
/// Mock netlink resolver for testing.
170150
#[cfg(test)]
171151
#[allow(dead_code)]
172-
fn create_nl_resolver() -> Option<NlRouter> {
173-
// Return None for tests to avoid complexity
152+
fn create_nl_resolver() -> Option<()> {
174153
None
175154
}
176155

@@ -194,18 +173,18 @@ impl ControlNetlinkActor {
194173
}
195174
}
196175

197-
if let Some(ref resolver) = self.resolver {
198-
match resolver.resolve_genl_family(&self.family) {
199-
Ok(family_info) => {
200-
debug!("Family '{}' exists with ID: {}", self.family, family_info);
176+
if let Some(ref mut resolver) = self.resolver {
177+
match netlink_utils::resolve_family_id(resolver, &self.family) {
178+
Ok(family_id) => {
179+
debug!("Family '{}' exists with ID: {}", self.family, family_id);
201180
true
202181
}
203182
Err(e) => {
204183
debug!("Family '{}' resolution failed: {:?}", self.family, e);
205184
// Only clear resolver on specific errors that indicate it's stale
206-
// For "family not found" errors, keep the resolver as it's still valid
207-
if e.to_string().contains("No such file or directory")
208-
|| e.to_string().contains("Connection refused")
185+
let err_str = format!("{:?}", e);
186+
if err_str.contains("No such file or directory")
187+
|| err_str.contains("Connection refused")
209188
{
210189
debug!("Clearing resolver due to connection error");
211190
self.resolver = None;
@@ -233,13 +212,14 @@ impl ControlNetlinkActor {
233212
socket: Option<&mut SocketType>,
234213
target_family: &str,
235214
) -> Result<bool, io::Error> {
215+
debug!("Attempting to receive control message");
236216
let socket = socket.ok_or_else(|| {
237217
io::Error::new(io::ErrorKind::NotConnected, "No control socket available")
238218
})?;
239219

240220
let mut buffer = vec![0; BUFFER_SIZE];
241-
match socket.recv(&mut buffer, Msg::DONTWAIT) {
242-
Ok((size, _)) => {
221+
match socket.recv_from(&mut buffer, 0) {
222+
Ok((size, _addr)) => {
243223
if size == 0 {
244224
return Ok(false);
245225
}
@@ -410,8 +390,9 @@ impl ControlNetlinkActor {
410390
// Log heartbeat every minute to show the actor is running
411391
if heartbeat_counter % HEARTBEAT_LOG_INTERVAL == 0 {
412392
info!(
413-
"ControlNetlinkActor is running normally - monitoring family '{}'",
414-
actor.family
393+
"ControlNetlinkActor is running normally - monitoring family '{}', family_was_available={}",
394+
actor.family,
395+
family_was_available,
415396
);
416397
}
417398

@@ -470,16 +451,16 @@ impl ControlNetlinkActor {
470451
family_was_available = family_available;
471452
} else if family_available {
472453
// Family is available but we haven't sent a reconnect recently
473-
// Send periodic reconnect commands to ensure DataNetlinkActor stays connected
454+
// Send periodic soft reconnect commands to ensure DataNetlinkActor stays connected
474455
// This handles cases where DataNetlinkActor disconnected due to socket errors
475-
// Since DataNetlinkActor.connect() now skips unnecessary reconnects, we can be more conservative
456+
// SoftReconnect only reconnects if socket is unhealthy, avoiding unnecessary reconnections
476457
if heartbeat_counter - last_periodic_reconnect_counter
477458
>= PERIODIC_RECONNECT_INTERVAL
478459
{
479-
debug!("Sending periodic reconnect command to ensure data socket stays connected (counter: {}, last: {}, interval: {})",
460+
debug!("Sending periodic soft reconnect command to check data socket health (counter: {}, last: {}, interval: {})",
480461
heartbeat_counter, last_periodic_reconnect_counter, PERIODIC_RECONNECT_INTERVAL);
481-
if let Err(e) = actor.command_sender.send(NetlinkCommand::Reconnect).await {
482-
warn!("Failed to send periodic reconnect command: {:?}", e);
462+
if let Err(e) = actor.command_sender.send(NetlinkCommand::SoftReconnect).await {
463+
warn!("Failed to send periodic soft reconnect command: {:?}", e);
483464
break; // Channel is closed, exit
484465
}
485466
last_periodic_reconnect_counter = heartbeat_counter;
@@ -512,7 +493,7 @@ pub mod test {
512493
pub struct MockSocket;
513494

514495
impl MockSocket {
515-
pub fn recv(&mut self, _buf: &mut [u8], _flags: Msg) -> Result<(usize, Groups), io::Error> {
496+
pub fn recv_from(&mut self, _buf: &mut [u8], _flags: i32) -> Result<(usize, SocketAddr), io::Error> {
516497
// Always return WouldBlock to simulate no control messages
517498
Err(io::Error::new(
518499
io::ErrorKind::WouldBlock,

0 commit comments

Comments
 (0)