Skip to content

Commit 49c8c68

Browse files
authored
Merge pull request #681 from ionosnetworks/feat/linux-poll-api
linux: use poll api instead of select inorder to support fd > 1024. Fixes #612 and #639
2 parents 07526a7 + 48f2109 commit 49c8c68

File tree

1 file changed

+76
-52
lines changed

1 file changed

+76
-52
lines changed

pnet_datalink/src/linux.rs

+76-52
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ use crate::{DataLinkReceiver, DataLinkSender, MacAddr, NetworkInterface};
1515

1616
use pnet_sys;
1717

18-
use std::cmp;
1918
use std::io;
2019
use std::mem;
21-
use std::ptr;
2220
use std::sync::Arc;
2321
use std::time::Duration;
2422

@@ -210,7 +208,6 @@ pub fn channel(
210208
let fd = Arc::new(pnet_sys::FileDesc { fd: socket });
211209
let sender = Box::new(DataLinkSenderImpl {
212210
socket: fd.clone(),
213-
fd_set: unsafe { mem::zeroed() },
214211
write_buffer: vec![0; config.write_buffer_size],
215212
_channel_type: config.channel_type,
216213
send_addr: unsafe { *(send_addr as *const libc::sockaddr_ll) },
@@ -221,7 +218,6 @@ pub fn channel(
221218
});
222219
let receiver = Box::new(DataLinkReceiverImpl {
223220
socket: fd.clone(),
224-
fd_set: unsafe { mem::zeroed() },
225221
read_buffer: vec![0; config.read_buffer_size],
226222
_channel_type: config.channel_type,
227223
timeout: config
@@ -234,7 +230,6 @@ pub fn channel(
234230

235231
struct DataLinkSenderImpl {
236232
socket: Arc<pnet_sys::FileDesc>,
237-
fd_set: libc::fd_set,
238233
write_buffer: Vec<u8>,
239234
_channel_type: super::ChannelType,
240235
send_addr: libc::sockaddr_ll,
@@ -253,35 +248,40 @@ impl DataLinkSender for DataLinkSenderImpl {
253248
) -> Option<io::Result<()>> {
254249
let len = num_packets * packet_size;
255250
if len <= self.write_buffer.len() {
256-
let min = cmp::min(self.write_buffer[..].len(), len);
251+
let min = std::cmp::min(self.write_buffer.len(), len);
257252
let mut_slice = &mut self.write_buffer;
253+
254+
let mut pollfd = libc::pollfd {
255+
fd: self.socket.fd,
256+
events: libc::POLLOUT, // Monitoring for write ability
257+
revents: 0, // Will be filled by poll to indicate the events that occurred
258+
};
259+
260+
// Convert timeout to milliseconds as required by poll
261+
let timeout_ms = self
262+
.timeout
263+
.as_ref()
264+
.map(|to| (to.tv_sec as i64 * 1000) + (to.tv_nsec as i64 / 1_000_000))
265+
.unwrap_or(-1); // -1 means wait indefinitely
266+
258267
for chunk in mut_slice[..min].chunks_mut(packet_size) {
259268
func(chunk);
260269
let send_addr =
261270
(&self.send_addr as *const libc::sockaddr_ll) as *const libc::sockaddr;
262271

263-
unsafe {
264-
libc::FD_ZERO(&mut self.fd_set as *mut libc::fd_set);
265-
libc::FD_SET(self.socket.fd, &mut self.fd_set as *mut libc::fd_set);
266-
}
267272
let ret = unsafe {
268-
libc::pselect(
269-
self.socket.fd + 1,
270-
ptr::null_mut(),
271-
&mut self.fd_set as *mut libc::fd_set,
272-
ptr::null_mut(),
273-
self.timeout
274-
.as_ref()
275-
.map(|to| to as *const libc::timespec)
276-
.unwrap_or(ptr::null()),
277-
ptr::null(),
273+
libc::poll(
274+
&mut pollfd as *mut libc::pollfd,
275+
1,
276+
timeout_ms as libc::c_int,
278277
)
279278
};
279+
280280
if ret == -1 {
281281
return Some(Err(io::Error::last_os_error()));
282282
} else if ret == 0 {
283283
return Some(Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")));
284-
} else {
284+
} else if pollfd.revents & libc::POLLOUT != 0 {
285285
if let Err(e) = pnet_sys::send_to(
286286
self.socket.fd,
287287
chunk,
@@ -290,6 +290,11 @@ impl DataLinkSender for DataLinkSenderImpl {
290290
) {
291291
return Some(Err(e));
292292
}
293+
} else {
294+
return Some(Err(io::Error::new(
295+
io::ErrorKind::Other,
296+
"Unexpected poll event",
297+
)));
293298
}
294299
}
295300

@@ -301,28 +306,33 @@ impl DataLinkSender for DataLinkSenderImpl {
301306

302307
#[inline]
303308
fn send_to(&mut self, packet: &[u8], _dst: Option<NetworkInterface>) -> Option<io::Result<()>> {
304-
unsafe {
305-
libc::FD_ZERO(&mut self.fd_set as *mut libc::fd_set);
306-
libc::FD_SET(self.socket.fd, &mut self.fd_set as *mut libc::fd_set);
307-
}
309+
let mut pollfd = libc::pollfd {
310+
fd: self.socket.fd,
311+
events: libc::POLLOUT, // Monitoring for write ability
312+
revents: 0, // Will be filled by poll to indicate the events that occurred
313+
};
314+
315+
// Convert timeout to milliseconds as required by poll
316+
let timeout_ms = self
317+
.timeout
318+
.as_ref()
319+
.map(|to| (to.tv_sec as i64 * 1000) + (to.tv_nsec as i64 / 1_000_000))
320+
.unwrap_or(-1); // -1 means wait indefinitely
321+
308322
let ret = unsafe {
309-
libc::pselect(
310-
self.socket.fd + 1,
311-
ptr::null_mut(),
312-
&mut self.fd_set as *mut libc::fd_set,
313-
ptr::null_mut(),
314-
self.timeout
315-
.as_ref()
316-
.map(|to| to as *const libc::timespec)
317-
.unwrap_or(ptr::null()),
318-
ptr::null(),
323+
libc::poll(
324+
&mut pollfd as *mut libc::pollfd,
325+
1,
326+
timeout_ms as libc::c_int,
319327
)
320328
};
329+
321330
if ret == -1 {
322331
Some(Err(io::Error::last_os_error()))
323332
} else if ret == 0 {
324333
Some(Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")))
325-
} else {
334+
} else if pollfd.revents & libc::POLLOUT != 0 {
335+
// POLLOUT is set, meaning the socket is ready for writing
326336
match pnet_sys::send_to(
327337
self.socket.fd,
328338
packet,
@@ -332,13 +342,17 @@ impl DataLinkSender for DataLinkSenderImpl {
332342
Err(e) => Some(Err(e)),
333343
Ok(_) => Some(Ok(())),
334344
}
345+
} else {
346+
Some(Err(io::Error::new(
347+
io::ErrorKind::Other,
348+
"Unexpected poll event",
349+
)))
335350
}
336351
}
337352
}
338353

339354
struct DataLinkReceiverImpl {
340355
socket: Arc<pnet_sys::FileDesc>,
341-
fd_set: libc::fd_set,
342356
read_buffer: Vec<u8>,
343357
_channel_type: super::ChannelType,
344358
timeout: Option<libc::timespec>,
@@ -347,33 +361,43 @@ struct DataLinkReceiverImpl {
347361
impl DataLinkReceiver for DataLinkReceiverImpl {
348362
fn next(&mut self) -> io::Result<&[u8]> {
349363
let mut caddr: libc::sockaddr_storage = unsafe { mem::zeroed() };
350-
unsafe {
351-
libc::FD_ZERO(&mut self.fd_set as *mut libc::fd_set);
352-
libc::FD_SET(self.socket.fd, &mut self.fd_set as *mut libc::fd_set);
353-
}
364+
let mut pollfd = libc::pollfd {
365+
fd: self.socket.fd,
366+
events: libc::POLLIN, // Monitoring for read availability
367+
revents: 0,
368+
};
369+
370+
// Convert timeout to milliseconds as required by poll
371+
let timeout_ms = self
372+
.timeout
373+
.as_ref()
374+
.map(|to| (to.tv_sec as i64 * 1000) + (to.tv_nsec as i64 / 1_000_000))
375+
.unwrap_or(-1); // -1 means wait indefinitely
376+
354377
let ret = unsafe {
355-
libc::pselect(
356-
self.socket.fd + 1,
357-
&mut self.fd_set as *mut libc::fd_set,
358-
ptr::null_mut(),
359-
ptr::null_mut(),
360-
self.timeout
361-
.as_ref()
362-
.map(|to| to as *const libc::timespec)
363-
.unwrap_or(ptr::null()),
364-
ptr::null(),
378+
libc::poll(
379+
&mut pollfd as *mut libc::pollfd,
380+
1,
381+
timeout_ms as libc::c_int,
365382
)
366383
};
384+
367385
if ret == -1 {
368386
Err(io::Error::last_os_error())
369387
} else if ret == 0 {
370388
Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out"))
371-
} else {
389+
} else if pollfd.revents & libc::POLLIN != 0 {
390+
// POLLIN is set, meaning the socket has data to be read
372391
let res = pnet_sys::recv_from(self.socket.fd, &mut self.read_buffer, &mut caddr);
373392
match res {
374393
Ok(len) => Ok(&self.read_buffer[0..len]),
375394
Err(e) => Err(e),
376395
}
396+
} else {
397+
Err(io::Error::new(
398+
io::ErrorKind::Other,
399+
"Unexpected poll event",
400+
))
377401
}
378402
}
379403
}

0 commit comments

Comments
 (0)