@@ -67,169 +67,20 @@ impl Swarm {
6767 AnnounceEvent :: Started | AnnounceEvent :: None | AnnounceEvent :: Completed => {
6868 self . upsert_peer ( Arc :: new ( * incoming_announce) , & mut downloads_increased) . await
6969 }
70- AnnounceEvent :: Stopped => self . remove ( incoming_announce) . await ,
70+ AnnounceEvent :: Stopped => self . remove_peer ( & incoming_announce. peer_addr ) . await ,
7171 } ;
7272
7373 downloads_increased
7474 }
7575
76- async fn upsert_peer (
77- & mut self ,
78- incoming_announce : Arc < PeerAnnouncement > ,
79- downloads_increased : & mut bool ,
80- ) -> Option < Arc < Peer > > {
81- let announcement = incoming_announce. clone ( ) ;
82-
83- if let Some ( previous_announce) = self . peers . insert ( incoming_announce. peer_addr , incoming_announce) {
84- * downloads_increased = self . update_metadata ( Some ( & previous_announce) , & announcement) ;
85-
86- self . trigger_peer_updated_event ( & previous_announce, & announcement, * downloads_increased)
87- . await ;
88-
89- Some ( previous_announce)
90- } else {
91- * downloads_increased = self . update_metadata ( None , & announcement) ;
92-
93- self . trigger_peer_added_event ( & announcement) . await ;
94-
95- None
96- }
97- }
98-
99- fn update_metadata (
100- & mut self ,
101- opt_previous_announce : Option < & Arc < PeerAnnouncement > > ,
102- new_announce : & Arc < PeerAnnouncement > ,
103- ) -> bool {
104- let mut downloads_increased = false ;
105-
106- if let Some ( previous_announce) = opt_previous_announce {
107- if previous_announce. role ( ) != new_announce. role ( ) {
108- if new_announce. is_seeder ( ) {
109- self . metadata . complete += 1 ;
110- self . metadata . incomplete -= 1 ;
111- } else {
112- self . metadata . complete -= 1 ;
113- self . metadata . incomplete += 1 ;
114- }
115- }
116-
117- if new_announce. is_completed ( ) && !previous_announce. is_completed ( ) {
118- self . metadata . downloaded += 1 ;
119- downloads_increased = true ;
120- }
121- } else if new_announce. is_seeder ( ) {
122- self . metadata . complete += 1 ;
123- } else {
124- self . metadata . incomplete += 1 ;
125- }
126-
127- downloads_increased
128- }
129-
130- async fn trigger_peer_updated_event (
131- & self ,
132- old_announce : & Arc < PeerAnnouncement > ,
133- new_announce : & Arc < PeerAnnouncement > ,
134- downloads_increased : bool ,
135- ) {
136- if let Some ( event_sender) = self . event_sender . as_deref ( ) {
137- event_sender
138- . send ( Event :: PeerUpdated {
139- info_hash : self . info_hash ,
140- old_peer : * old_announce. clone ( ) ,
141- new_peer : * new_announce. clone ( ) ,
142- } )
143- . await ;
144-
145- if downloads_increased {
146- event_sender
147- . send ( Event :: PeerDownloadCompleted {
148- info_hash : self . info_hash ,
149- peer : * new_announce. clone ( ) ,
150- } )
151- . await ;
152- }
153- }
154- }
155-
156- async fn trigger_peer_added_event ( & self , announcement : & Arc < PeerAnnouncement > ) {
157- if let Some ( event_sender) = self . event_sender . as_deref ( ) {
158- event_sender
159- . send ( Event :: PeerAdded {
160- info_hash : self . info_hash ,
161- peer : * announcement. clone ( ) ,
162- } )
163- . await ;
164- }
165- }
166-
167- pub async fn remove ( & mut self , peer_to_remove : & Peer ) -> Option < Arc < Peer > > {
168- match self . peers . remove ( & peer_to_remove. peer_addr ) {
169- Some ( old_peer) => {
170- // A peer has been removed from the swarm.
171-
172- // Check if the peer was a seeder or a leecher.
173- if old_peer. is_seeder ( ) {
174- self . metadata . complete -= 1 ;
175- } else {
176- self . metadata . incomplete -= 1 ;
177- }
178-
179- if let Some ( event_sender) = self . event_sender . as_deref ( ) {
180- event_sender
181- . send ( Event :: PeerRemoved {
182- info_hash : self . info_hash ,
183- peer : * old_peer. clone ( ) ,
184- } )
185- . await ;
186- }
187-
188- Some ( old_peer)
189- }
190- None => None ,
191- }
192- }
193-
19476 pub async fn remove_inactive ( & mut self , current_cutoff : DurationSinceUnixEpoch ) -> usize {
195- let mut number_of_peers_removed = 0 ;
196- let mut removed_peers = Vec :: new ( ) ;
197-
198- self . peers . retain ( |_key, peer| {
199- let is_active = peer:: ReadInfo :: get_updated ( peer) > current_cutoff;
200-
201- if !is_active {
202- // Update the metadata when removing a peer.
203- if peer. is_seeder ( ) {
204- self . metadata . complete -= 1 ;
205- } else {
206- self . metadata . incomplete -= 1 ;
207- }
208-
209- number_of_peers_removed += 1 ;
210-
211- if let Some ( _event_sender) = self . event_sender . as_deref ( ) {
212- // Events can not be trigger here because retain does not allow
213- // async closures.
214- removed_peers. push ( * peer. clone ( ) ) ;
215- }
216- }
77+ let peers_to_remove = self . inactive_peers ( current_cutoff) ;
21778
218- is_active
219- } ) ;
220-
221- if let Some ( event_sender) = self . event_sender . as_deref ( ) {
222- for peer in & removed_peers {
223- event_sender
224- . send ( Event :: PeerRemoved {
225- info_hash : self . info_hash ,
226- peer : * peer,
227- } )
228- . await ;
229- }
79+ for peer_addr in & peers_to_remove {
80+ self . remove_peer ( peer_addr) . await ;
23081 }
23182
232- number_of_peers_removed
83+ peers_to_remove . len ( )
23384 }
23485
23586 #[ must_use]
@@ -316,6 +167,57 @@ impl Swarm {
316167 !self . should_be_removed ( policy)
317168 }
318169
170+ async fn upsert_peer (
171+ & mut self ,
172+ incoming_announce : Arc < PeerAnnouncement > ,
173+ downloads_increased : & mut bool ,
174+ ) -> Option < Arc < Peer > > {
175+ let announcement = incoming_announce. clone ( ) ;
176+
177+ if let Some ( previous_announce) = self . peers . insert ( incoming_announce. peer_addr , incoming_announce) {
178+ * downloads_increased = self . update_metadata_on_update ( & previous_announce, & announcement) ;
179+
180+ self . trigger_peer_updated_event ( & previous_announce, & announcement) . await ;
181+
182+ if * downloads_increased {
183+ self . trigger_peer_download_completed_event ( & announcement) . await ;
184+ }
185+
186+ Some ( previous_announce)
187+ } else {
188+ * downloads_increased = false ;
189+
190+ self . update_metadata_on_insert ( & announcement) ;
191+
192+ self . trigger_peer_added_event ( & announcement) . await ;
193+
194+ None
195+ }
196+ }
197+
198+ async fn remove_peer ( & mut self , peer_addr : & SocketAddr ) -> Option < Arc < Peer > > {
199+ if let Some ( old_peer) = self . peers . remove ( peer_addr) {
200+ self . update_metadata_on_removal ( & old_peer) ;
201+
202+ self . trigger_peer_removed_event ( & old_peer) . await ;
203+
204+ Some ( old_peer)
205+ } else {
206+ None
207+ }
208+ }
209+
210+ #[ must_use]
211+ fn inactive_peers ( & self , current_cutoff : DurationSinceUnixEpoch ) -> Vec < SocketAddr > {
212+ self . peers
213+ . iter ( )
214+ . filter ( |( _, peer) | peer:: ReadInfo :: get_updated ( & * * peer) <= current_cutoff)
215+ . map ( |( addr, _) | * addr)
216+ . collect ( )
217+ }
218+
219+ /// Returns true if the swarm should be removed according to the retention
220+ /// policy.
319221 fn should_be_removed ( & self , policy : & TrackerPolicy ) -> bool {
320222 // If the policy is to remove peerless torrents and the swarm is empty (no peers),
321223 ( policy. remove_peerless_torrents && self . is_empty ( ) )
@@ -325,6 +227,92 @@ impl Swarm {
325227 // See https://github.com/torrust/torrust-tracker/issues/1502)
326228 && !( policy. persistent_torrent_completed_stat && self . metadata ( ) . downloaded > 0 )
327229 }
230+
231+ fn update_metadata_on_insert ( & mut self , added_peer : & Arc < PeerAnnouncement > ) {
232+ if added_peer. is_seeder ( ) {
233+ self . metadata . complete += 1 ;
234+ } else {
235+ self . metadata . incomplete += 1 ;
236+ }
237+ }
238+
239+ fn update_metadata_on_removal ( & mut self , removed_peer : & Arc < Peer > ) {
240+ if removed_peer. is_seeder ( ) {
241+ self . metadata . complete -= 1 ;
242+ } else {
243+ self . metadata . incomplete -= 1 ;
244+ }
245+ }
246+
247+ fn update_metadata_on_update (
248+ & mut self ,
249+ previous_announce : & Arc < PeerAnnouncement > ,
250+ new_announce : & Arc < PeerAnnouncement > ,
251+ ) -> bool {
252+ let mut downloads_increased = false ;
253+
254+ if previous_announce. role ( ) != new_announce. role ( ) {
255+ if new_announce. is_seeder ( ) {
256+ self . metadata . complete += 1 ;
257+ self . metadata . incomplete -= 1 ;
258+ } else {
259+ self . metadata . complete -= 1 ;
260+ self . metadata . incomplete += 1 ;
261+ }
262+ }
263+
264+ if new_announce. is_completed ( ) && !previous_announce. is_completed ( ) {
265+ self . metadata . downloaded += 1 ;
266+ downloads_increased = true ;
267+ }
268+
269+ downloads_increased
270+ }
271+
272+ async fn trigger_peer_added_event ( & self , announcement : & Arc < PeerAnnouncement > ) {
273+ if let Some ( event_sender) = self . event_sender . as_deref ( ) {
274+ event_sender
275+ . send ( Event :: PeerAdded {
276+ info_hash : self . info_hash ,
277+ peer : * announcement. clone ( ) ,
278+ } )
279+ . await ;
280+ }
281+ }
282+
283+ async fn trigger_peer_removed_event ( & self , old_peer : & Arc < Peer > ) {
284+ if let Some ( event_sender) = self . event_sender . as_deref ( ) {
285+ event_sender
286+ . send ( Event :: PeerRemoved {
287+ info_hash : self . info_hash ,
288+ peer : * old_peer. clone ( ) ,
289+ } )
290+ . await ;
291+ }
292+ }
293+
294+ async fn trigger_peer_updated_event ( & self , old_announce : & Arc < PeerAnnouncement > , new_announce : & Arc < PeerAnnouncement > ) {
295+ if let Some ( event_sender) = self . event_sender . as_deref ( ) {
296+ event_sender
297+ . send ( Event :: PeerUpdated {
298+ info_hash : self . info_hash ,
299+ old_peer : * old_announce. clone ( ) ,
300+ new_peer : * new_announce. clone ( ) ,
301+ } )
302+ . await ;
303+ }
304+ }
305+
306+ async fn trigger_peer_download_completed_event ( & self , new_announce : & Arc < PeerAnnouncement > ) {
307+ if let Some ( event_sender) = self . event_sender . as_deref ( ) {
308+ event_sender
309+ . send ( Event :: PeerDownloadCompleted {
310+ info_hash : self . info_hash ,
311+ peer : * new_announce. clone ( ) ,
312+ } )
313+ . await ;
314+ }
315+ }
328316}
329317
330318#[ cfg( test) ]
@@ -435,7 +423,7 @@ mod tests {
435423
436424 swarm. upsert_peer ( peer. into ( ) , & mut downloads_increased) . await ;
437425
438- swarm. remove ( & peer) . await ;
426+ swarm. remove_peer ( & peer. peer_addr ) . await ;
439427
440428 assert ! ( swarm. is_empty( ) ) ;
441429 }
@@ -449,7 +437,7 @@ mod tests {
449437
450438 swarm. upsert_peer ( peer. into ( ) , & mut downloads_increased) . await ;
451439
452- let old = swarm. remove ( & peer) . await ;
440+ let old = swarm. remove_peer ( & peer. peer_addr ) . await ;
453441
454442 assert_eq ! ( old, Some ( Arc :: new( peer) ) ) ;
455443 assert_eq ! ( swarm. get( & peer. peer_addr) , None ) ;
@@ -461,7 +449,7 @@ mod tests {
461449
462450 let peer = PeerBuilder :: default ( ) . build ( ) ;
463451
464- assert_eq ! ( swarm. remove ( & peer) . await , None ) ;
452+ assert_eq ! ( swarm. remove_peer ( & peer. peer_addr ) . await , None ) ;
465453 }
466454
467455 #[ tokio:: test]
@@ -787,7 +775,7 @@ mod tests {
787775
788776 let leechers = swarm. metadata ( ) . leechers ( ) ;
789777
790- swarm. remove ( & leecher) . await ;
778+ swarm. remove_peer ( & leecher. peer_addr ) . await ;
791779
792780 assert_eq ! ( swarm. metadata( ) . leechers( ) , leechers - 1 ) ;
793781 }
@@ -803,7 +791,7 @@ mod tests {
803791
804792 let seeders = swarm. metadata ( ) . seeders ( ) ;
805793
806- swarm. remove ( & seeder) . await ;
794+ swarm. remove_peer ( & seeder. peer_addr ) . await ;
807795
808796 assert_eq ! ( swarm. metadata( ) . seeders( ) , seeders - 1 ) ;
809797 }
@@ -983,7 +971,7 @@ mod tests {
983971 let mut downloads_increased = false ;
984972 swarm. upsert_peer ( peer. into ( ) , & mut downloads_increased) . await ;
985973
986- swarm. remove ( & peer) . await ;
974+ swarm. remove_peer ( & peer. peer_addr ) . await ;
987975 }
988976
989977 #[ tokio:: test]
0 commit comments