-
Notifications
You must be signed in to change notification settings - Fork 51
Process UDP requests concurrently #611
Copy link
Copy link
Closed
Labels
Enhancement / Feature RequestSomething NewSomething New
Milestone
Description
Relates to: #565, #596
Depends on: #565
The current version of the UDP server processes one request at a time.
let listen = async move {
debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");
loop {
let mut data = [0; MAX_PACKET_SIZE];
let socket_clone = socket.clone();
match socket_clone.recv_from(&mut data).await {
Ok((valid_bytes, remote_addr)) => {
let payload = data[..valid_bytes].to_vec();
debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
debug!(target: "UDP Tracker", "Payload: {:?}", payload);
let response_fut = handle_packet(remote_addr, payload, &tracker);
match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
Ok(response) => {
Udp::send_response(socket_clone, remote_addr, response).await;
}
Err(_) => {
error!("Timeout occurred while processing the UDP request.");
}
}
}
Err(err) => {
error!("Error reading UDP datagram from socket. Error: {:?}", err);
}
}
}
};For the time being, that's not a bottleneck because we also write sequentially into the Tracker repository. @WarmBeer is working on the issue:
That would allow concurrent writes/reads but to take advantage of that change I think we need to change the UDP server.
One implementation proposal could be using workers. I think @da2ce7 was working on a refactor where he was using a pool of active requests handled by spawned tasks (when he was working on the graceful shutdown).
This is the draft ChatGTP implementation:
// ... [existing imports] ...
impl Udp {
// ... [other methods] ...
/// It starts the UDP server instance with graceful shutdown.
async fn start_with_graceful_shutdown(
tracker: Arc<Tracker>,
bind_to: SocketAddr,
tx_start: Sender<Started>,
rx_halt: Receiver<Halted>,
) -> JoinHandle<()> {
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to UDP socket."));
let address = socket.local_addr().expect("Could not get local_addr from binding.");
info!(target: "UDP Tracker", "Starting on: udp://{}", address);
let (tx, rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(100);
let rx = Arc::new(Mutex::new(rx));
let n_workers = 10; // Set the number of workers as needed
for _ in 0..n_workers {
let worker_rx = rx.clone();
let worker_socket = socket.clone();
let worker_tracker = tracker.clone();
tokio::spawn(async move {
worker_process(worker_rx, worker_socket, worker_tracker).await;
});
}
let running = tokio::task::spawn(async move {
let mut halt = tokio::task::spawn(async move {
shutdown_signal_with_message(
rx_halt,
format!("Shutting down UDP server on socket address: udp://{address}"),
)
.await;
});
loop {
tokio::select! {
Ok(_) = &mut halt => {
debug!(target: "UDP Tracker", "Halt signal received, stopping listening for new requests.");
break;
}
result = Udp::receive_packet(&socket) => {
if let Ok((payload, remote_addr)) = result {
if tx.send((payload, remote_addr)).await.is_err() {
error!("Failed to send request to worker");
}
} else if let Err(e) = result {
error!("Error reading UDP datagram from socket. Error: {:?}", e);
}
}
}
}
debug!(target: "UDP Tracker", "Main listener stopped, waiting for worker tasks to complete.");
});
info!(target: "UDP Tracker", "Started on: udp://{}", address);
running
}
async fn receive_packet(socket: &Arc<UdpSocket>) -> std::io::Result<(Vec<u8>, SocketAddr)> {
let mut data = [0; MAX_PACKET_SIZE];
let (valid_bytes, remote_addr) = socket.recv_from(&mut data).await?;
let payload = data[..valid_bytes].to_vec();
Ok((payload, remote_addr))
}
// ... [existing Udp methods] ...
}
async fn worker_process(
rx: Arc<Mutex<mpsc::Receiver<(Vec<u8>, SocketAddr)>>>,
socket: Arc<UdpSocket>,
tracker: Arc<Tracker>,
) {
while let Some((payload, remote_addr)) = rx.lock().await.recv().await {
let response_fut = handle_packet(remote_addr, payload, &tracker);
// Implementing a 5-second timeout for each request
match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
Ok(response) => {
Udp::send_response(socket.clone(), remote_addr, response).await;
}
Err(_) => {
error!("Timeout occurred while processing the UDP request from {}", remote_addr);
// Optionally, send a timeout response or error message back to the client
// Udp::send_timeout_response(socket.clone(), remote_addr).await;
}
}
}
}
// ... [existing tests and other code] ...
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
Enhancement / Feature RequestSomething NewSomething New
Type
Projects
Status
Done