@@ -120,23 +120,30 @@ impl UdpServer<Stopped> {
120120 let ( tx_start, rx_start) = tokio:: sync:: oneshot:: channel :: < Started > ( ) ;
121121 let ( tx_halt, rx_halt) = tokio:: sync:: oneshot:: channel :: < Halted > ( ) ;
122122
123+ assert ! ( !tx_halt. is_closed( ) , "Halt channel for UDP tracker should be open" ) ;
124+
123125 let launcher = self . state . launcher ;
124126
125127 let task = tokio:: spawn ( async move {
126- launcher. start ( tracker, tx_start, rx_halt) . await ;
128+ debug ! ( target: "UDP Tracker" , "Launcher starting ..." ) ;
129+
130+ let starting = launcher. start ( tracker, tx_start, rx_halt) . await ;
131+
132+ starting. await . expect ( "UDP server should have started running" ) ;
133+
127134 launcher
128135 } ) ;
129136
137+ let binding = rx_start. await . expect ( "unable to start service" ) . address ;
138+
130139 let running_udp_server: UdpServer < Running > = UdpServer {
131140 state : Running {
132- binding : rx_start . await . expect ( "unable to start service" ) . address ,
141+ binding,
133142 halt_task : tx_halt,
134143 task,
135144 } ,
136145 } ;
137146
138- info ! ( "Running UDP Tracker on Socket: {}" , running_udp_server. state. binding) ;
139-
140147 Ok ( running_udp_server)
141148 }
142149}
@@ -202,41 +209,62 @@ impl Udp {
202209 tx_start : Sender < Started > ,
203210 rx_halt : Receiver < Halted > ,
204211 ) -> JoinHandle < ( ) > {
205- let binding = Arc :: new ( UdpSocket :: bind ( bind_to) . await . expect ( "Could not bind to {self.socket}." ) ) ;
206- let address = binding. local_addr ( ) . expect ( "Could not get local_addr from {binding}." ) ;
212+ let socket = Arc :: new ( UdpSocket :: bind ( bind_to) . await . expect ( "Could not bind to {self.socket}." ) ) ;
213+ let address = socket. local_addr ( ) . expect ( "Could not get local_addr from {binding}." ) ;
214+
215+ info ! ( target: "UDP Tracker" , "Starting on: udp://{}" , address) ;
207216
208217 let running = tokio:: task:: spawn ( async move {
209- let halt = async move {
210- shutdown_signal_with_message ( rx_halt, format ! ( "Halting Http Service Bound to Socket: {address}" ) ) . await ;
218+ let halt = tokio:: task:: spawn ( async move {
219+ debug ! ( target: "UDP Tracker" , "Waiting for halt signal for socket address: udp://{address} ..." ) ;
220+
221+ shutdown_signal_with_message (
222+ rx_halt,
223+ format ! ( "Shutting down UDP server on socket address: udp://{address}" ) ,
224+ )
225+ . await ;
226+ } ) ;
227+
228+ let listen = async move {
229+ debug ! ( target: "UDP Tracker" , "Waiting for packets on socket address: udp://{address} ..." ) ;
230+
231+ loop {
232+ let mut data = [ 0 ; MAX_PACKET_SIZE ] ;
233+ let socket_clone = socket. clone ( ) ;
234+
235+ match socket_clone. recv_from ( & mut data) . await {
236+ Ok ( ( valid_bytes, remote_addr) ) => {
237+ let payload = data[ ..valid_bytes] . to_vec ( ) ;
238+
239+ debug ! ( target: "UDP Tracker" , "Received {} bytes" , payload. len( ) ) ;
240+ debug ! ( target: "UDP Tracker" , "From: {}" , & remote_addr) ;
241+ debug ! ( target: "UDP Tracker" , "Payload: {:?}" , payload) ;
242+
243+ let response = handle_packet ( remote_addr, payload, & tracker) . await ;
244+
245+ Udp :: send_response ( socket_clone, remote_addr, response) . await ;
246+ }
247+ Err ( err) => {
248+ error ! ( "Error reading UDP datagram from socket. Error: {:?}" , err) ;
249+ }
250+ }
251+ }
211252 } ;
212253
213254 pin_mut ! ( halt) ;
255+ pin_mut ! ( listen) ;
214256
215- loop {
216- let mut data = [ 0 ; MAX_PACKET_SIZE ] ;
217- let binding = binding. clone ( ) ;
218-
219- tokio:: select! {
220- ( ) = & mut halt => { } ,
221-
222- Ok ( ( valid_bytes, remote_addr) ) = binding. recv_from( & mut data) => {
223- let payload = data[ ..valid_bytes] . to_vec( ) ;
224-
225- debug!( "Received {} bytes" , payload. len( ) ) ;
226- debug!( "From: {}" , & remote_addr) ;
227- debug!( "Payload: {:?}" , payload) ;
257+ tx_start
258+ . send ( Started { address } )
259+ . expect ( "the UDP Tracker service should not be dropped" ) ;
228260
229- let response = handle_packet( remote_addr, payload, & tracker) . await ;
230-
231- Udp :: send_response( binding, remote_addr, response) . await ;
232- }
233- }
261+ tokio:: select! {
262+ _ = & mut halt => { debug!( target: "UDP Tracker" , "Halt signal spawned task stopped on address: udp://{address}" ) ; } ,
263+ ( ) = & mut listen => { debug!( target: "UDP Tracker" , "Socket listener stopped on address: udp://{address}" ) ; } ,
234264 }
235265 } ) ;
236266
237- tx_start
238- . send ( Started { address } )
239- . expect ( "the UDP Tracker service should not be dropped" ) ;
267+ info ! ( target: "UDP Tracker" , "Started on: udp://{}" , address) ;
240268
241269 running
242270 }
0 commit comments