Skip to content

Process UDP requests concurrently #611

@josecelano

Description

@josecelano

Relates to: #565, #596
Depends on: #565

The current version of the UDP server processes one request at a time.

let listen = async move {
    debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");

    loop {
        let mut data = [0; MAX_PACKET_SIZE];
        let socket_clone = socket.clone();

        match socket_clone.recv_from(&mut data).await {
            Ok((valid_bytes, remote_addr)) => {
                let payload = data[..valid_bytes].to_vec();

                debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
                debug!(target: "UDP Tracker", "From: {}", &remote_addr);
                debug!(target: "UDP Tracker", "Payload: {:?}", payload);

                let response_fut = handle_packet(remote_addr, payload, &tracker);

                match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
                    Ok(response) => {
                        Udp::send_response(socket_clone, remote_addr, response).await;
                    }
                    Err(_) => {
                        error!("Timeout occurred while processing the UDP request.");
                    }
                }
            }
            Err(err) => {
                error!("Error reading UDP datagram from socket. Error: {:?}", err);
            }
        }
    }
};

For the time being, that's not a bottleneck because we also write sequentially into the Tracker repository. @WarmBeer is working on the issue:

That would allow concurrent writes/reads but to take advantage of that change I think we need to change the UDP server.

One implementation proposal could be using workers. I think @da2ce7 was working on a refactor where he was using a pool of active requests handled by spawned tasks (when he was working on the graceful shutdown).

This is the draft ChatGTP implementation:

// ... [existing imports] ...

impl Udp {
    // ... [other methods] ...

    /// It starts the UDP server instance with graceful shutdown.
    async fn start_with_graceful_shutdown(
        tracker: Arc<Tracker>,
        bind_to: SocketAddr,
        tx_start: Sender<Started>,
        rx_halt: Receiver<Halted>,
    ) -> JoinHandle<()> {
        let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to UDP socket."));
        let address = socket.local_addr().expect("Could not get local_addr from binding.");

        info!(target: "UDP Tracker", "Starting on: udp://{}", address);

        let (tx, rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(100);
        let rx = Arc::new(Mutex::new(rx));

        let n_workers = 10; // Set the number of workers as needed
        for _ in 0..n_workers {
            let worker_rx = rx.clone();
            let worker_socket = socket.clone();
            let worker_tracker = tracker.clone();
            tokio::spawn(async move {
                worker_process(worker_rx, worker_socket, worker_tracker).await;
            });
        }

        let running = tokio::task::spawn(async move {
            let mut halt = tokio::task::spawn(async move {
                shutdown_signal_with_message(
                    rx_halt,
                    format!("Shutting down UDP server on socket address: udp://{address}"),
                )
                .await;
            });

            loop {
                tokio::select! {
                    Ok(_) = &mut halt => {
                        debug!(target: "UDP Tracker", "Halt signal received, stopping listening for new requests.");
                        break;
                    }
                    result = Udp::receive_packet(&socket) => {
                        if let Ok((payload, remote_addr)) = result {
                            if tx.send((payload, remote_addr)).await.is_err() {
                                error!("Failed to send request to worker");
                            }
                        } else if let Err(e) = result {
                            error!("Error reading UDP datagram from socket. Error: {:?}", e);
                        }
                    }
                }
            }

            debug!(target: "UDP Tracker", "Main listener stopped, waiting for worker tasks to complete.");
        });

        info!(target: "UDP Tracker", "Started on: udp://{}", address);

        running
    }

    async fn receive_packet(socket: &Arc<UdpSocket>) -> std::io::Result<(Vec<u8>, SocketAddr)> {
        let mut data = [0; MAX_PACKET_SIZE];
        let (valid_bytes, remote_addr) = socket.recv_from(&mut data).await?;
        let payload = data[..valid_bytes].to_vec();

        Ok((payload, remote_addr))
    }

    // ... [existing Udp methods] ...
}

async fn worker_process(
    rx: Arc<Mutex<mpsc::Receiver<(Vec<u8>, SocketAddr)>>>,
    socket: Arc<UdpSocket>,
    tracker: Arc<Tracker>,
) {
    while let Some((payload, remote_addr)) = rx.lock().await.recv().await {
        let response_fut = handle_packet(remote_addr, payload, &tracker);

        // Implementing a 5-second timeout for each request
        match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
            Ok(response) => {
                Udp::send_response(socket.clone(), remote_addr, response).await;
            }
            Err(_) => {
                error!("Timeout occurred while processing the UDP request from {}", remote_addr);
                // Optionally, send a timeout response or error message back to the client
                // Udp::send_timeout_response(socket.clone(), remote_addr).await;
            }
        }
    }
}

// ... [existing tests and other code] ...

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    Done

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions