Skip to content

Commit 0c1f389

Browse files
committed
fix: [#591] panicking after starting UDP server due to close halt channel
1 parent 49c961c commit 0c1f389

File tree

5 files changed

+75
-33
lines changed

5 files changed

+75
-33
lines changed

cSpell.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"Containerfile",
3333
"curr",
3434
"Cyberneering",
35+
"datagram",
3536
"datetime",
3637
"Dijke",
3738
"distroless",
@@ -79,6 +80,7 @@
7980
"nonroot",
8081
"Norberg",
8182
"numwant",
83+
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7",
8284
"oneshot",
8385
"ostr",
8486
"Pando",
@@ -129,8 +131,7 @@
129131
"Xtorrent",
130132
"Xunlei",
131133
"xxxxxxxxxxxxxxxxxxxxd",
132-
"yyyyyyyyyyyyyyyyyyyyd",
133-
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7"
134+
"yyyyyyyyyyyyyyyyyyyyd"
134135
],
135136
"enableFiletypes": [
136137
"dockerfile",

src/bootstrap/jobs/udp_tracker.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
//! for the configuration options.
99
use std::sync::Arc;
1010

11+
use log::debug;
1112
use tokio::task::JoinHandle;
1213
use torrust_tracker_configuration::UdpTracker;
1314

@@ -36,10 +37,20 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>) -> Join
3637
.expect("it should be able to start the udp tracker");
3738

3839
tokio::spawn(async move {
40+
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");
41+
debug!(target: "UDP Tracker", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());
42+
43+
assert!(
44+
!server.state.halt_task.is_closed(),
45+
"Halt channel for UDP tracker should be open"
46+
);
47+
3948
server
4049
.state
4150
.task
4251
.await
4352
.expect("it should be able to join to the udp tracker task");
53+
54+
debug!(target: "UDP Tracker", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());
4455
})
4556
}

src/servers/apis/server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ impl Launcher {
175175
let tls = self.tls.clone();
176176
let protocol = if tls.is_some() { "https" } else { "http" };
177177

178+
info!(target: "API", "Starting on {protocol}://{}", address);
179+
178180
let running = Box::pin(async {
179181
match tls {
180182
Some(tls) => axum_server::from_tcp_rustls(socket, tls)
@@ -190,7 +192,7 @@ impl Launcher {
190192
}
191193
});
192194

193-
info!(target: "API", "API server started on {protocol}://{}", address);
195+
info!(target: "API", "Started on {protocol}://{}", address);
194196

195197
tx_start
196198
.send(Started { address })

src/servers/http/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Launcher {
4848
tokio::task::spawn(graceful_shutdown(
4949
handle.clone(),
5050
rx_halt,
51-
format!("Shutting down http server on socket address: {address}"),
51+
format!("Shutting down HTTP server on socket address: {address}"),
5252
));
5353

5454
let tls = self.tls.clone();

src/servers/udp/server.rs

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,30 @@ impl UdpServer<Stopped> {
120120
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
121121
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();
122122

123+
assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");
124+
123125
let launcher = self.state.launcher;
124126

125127
let task = tokio::spawn(async move {
126-
launcher.start(tracker, tx_start, rx_halt).await;
128+
debug!(target: "UDP Tracker", "Launcher starting ...");
129+
130+
let starting = launcher.start(tracker, tx_start, rx_halt).await;
131+
132+
starting.await.expect("UDP server should have started running");
133+
127134
launcher
128135
});
129136

137+
let binding = rx_start.await.expect("unable to start service").address;
138+
130139
let running_udp_server: UdpServer<Running> = UdpServer {
131140
state: Running {
132-
binding: rx_start.await.expect("unable to start service").address,
141+
binding,
133142
halt_task: tx_halt,
134143
task,
135144
},
136145
};
137146

138-
info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);
139-
140147
Ok(running_udp_server)
141148
}
142149
}
@@ -202,41 +209,62 @@ impl Udp {
202209
tx_start: Sender<Started>,
203210
rx_halt: Receiver<Halted>,
204211
) -> JoinHandle<()> {
205-
let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
206-
let address = binding.local_addr().expect("Could not get local_addr from {binding}.");
212+
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
213+
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");
214+
215+
info!(target: "UDP Tracker", "Starting on: udp://{}", address);
207216

208217
let running = tokio::task::spawn(async move {
209-
let halt = async move {
210-
shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await;
218+
let halt = tokio::task::spawn(async move {
219+
debug!(target: "UDP Tracker", "Waiting for halt signal for socket address: udp://{address} ...");
220+
221+
shutdown_signal_with_message(
222+
rx_halt,
223+
format!("Shutting down UDP server on socket address: udp://{address}"),
224+
)
225+
.await;
226+
});
227+
228+
let listen = async move {
229+
debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");
230+
231+
loop {
232+
let mut data = [0; MAX_PACKET_SIZE];
233+
let socket_clone = socket.clone();
234+
235+
match socket_clone.recv_from(&mut data).await {
236+
Ok((valid_bytes, remote_addr)) => {
237+
let payload = data[..valid_bytes].to_vec();
238+
239+
debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
240+
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
241+
debug!(target: "UDP Tracker", "Payload: {:?}", payload);
242+
243+
let response = handle_packet(remote_addr, payload, &tracker).await;
244+
245+
Udp::send_response(socket_clone, remote_addr, response).await;
246+
}
247+
Err(err) => {
248+
error!("Error reading UDP datagram from socket. Error: {:?}", err);
249+
}
250+
}
251+
}
211252
};
212253

213254
pin_mut!(halt);
255+
pin_mut!(listen);
214256

215-
loop {
216-
let mut data = [0; MAX_PACKET_SIZE];
217-
let binding = binding.clone();
218-
219-
tokio::select! {
220-
() = & mut halt => {},
221-
222-
Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => {
223-
let payload = data[..valid_bytes].to_vec();
224-
225-
debug!("Received {} bytes", payload.len());
226-
debug!("From: {}", &remote_addr);
227-
debug!("Payload: {:?}", payload);
257+
tx_start
258+
.send(Started { address })
259+
.expect("the UDP Tracker service should not be dropped");
228260

229-
let response = handle_packet(remote_addr, payload, &tracker).await;
230-
231-
Udp::send_response(binding, remote_addr, response).await;
232-
}
233-
}
261+
tokio::select! {
262+
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
263+
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
234264
}
235265
});
236266

237-
tx_start
238-
.send(Started { address })
239-
.expect("the UDP Tracker service should not be dropped");
267+
info!(target: "UDP Tracker", "Started on: udp://{}", address);
240268

241269
running
242270
}

0 commit comments

Comments
 (0)