Skip to content

Commit d154b2a

Browse files
committed
refactor: [#1358] clean Swarm type
1 parent b3b0b71 commit d154b2a

File tree

1 file changed

+148
-160
lines changed

1 file changed

+148
-160
lines changed

packages/torrent-repository/src/swarm.rs

Lines changed: 148 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -67,169 +67,20 @@ impl Swarm {
6767
AnnounceEvent::Started | AnnounceEvent::None | AnnounceEvent::Completed => {
6868
self.upsert_peer(Arc::new(*incoming_announce), &mut downloads_increased).await
6969
}
70-
AnnounceEvent::Stopped => self.remove(incoming_announce).await,
70+
AnnounceEvent::Stopped => self.remove_peer(&incoming_announce.peer_addr).await,
7171
};
7272

7373
downloads_increased
7474
}
7575

76-
async fn upsert_peer(
77-
&mut self,
78-
incoming_announce: Arc<PeerAnnouncement>,
79-
downloads_increased: &mut bool,
80-
) -> Option<Arc<Peer>> {
81-
let announcement = incoming_announce.clone();
82-
83-
if let Some(previous_announce) = self.peers.insert(incoming_announce.peer_addr, incoming_announce) {
84-
*downloads_increased = self.update_metadata(Some(&previous_announce), &announcement);
85-
86-
self.trigger_peer_updated_event(&previous_announce, &announcement, *downloads_increased)
87-
.await;
88-
89-
Some(previous_announce)
90-
} else {
91-
*downloads_increased = self.update_metadata(None, &announcement);
92-
93-
self.trigger_peer_added_event(&announcement).await;
94-
95-
None
96-
}
97-
}
98-
99-
fn update_metadata(
100-
&mut self,
101-
opt_previous_announce: Option<&Arc<PeerAnnouncement>>,
102-
new_announce: &Arc<PeerAnnouncement>,
103-
) -> bool {
104-
let mut downloads_increased = false;
105-
106-
if let Some(previous_announce) = opt_previous_announce {
107-
if previous_announce.role() != new_announce.role() {
108-
if new_announce.is_seeder() {
109-
self.metadata.complete += 1;
110-
self.metadata.incomplete -= 1;
111-
} else {
112-
self.metadata.complete -= 1;
113-
self.metadata.incomplete += 1;
114-
}
115-
}
116-
117-
if new_announce.is_completed() && !previous_announce.is_completed() {
118-
self.metadata.downloaded += 1;
119-
downloads_increased = true;
120-
}
121-
} else if new_announce.is_seeder() {
122-
self.metadata.complete += 1;
123-
} else {
124-
self.metadata.incomplete += 1;
125-
}
126-
127-
downloads_increased
128-
}
129-
130-
async fn trigger_peer_updated_event(
131-
&self,
132-
old_announce: &Arc<PeerAnnouncement>,
133-
new_announce: &Arc<PeerAnnouncement>,
134-
downloads_increased: bool,
135-
) {
136-
if let Some(event_sender) = self.event_sender.as_deref() {
137-
event_sender
138-
.send(Event::PeerUpdated {
139-
info_hash: self.info_hash,
140-
old_peer: *old_announce.clone(),
141-
new_peer: *new_announce.clone(),
142-
})
143-
.await;
144-
145-
if downloads_increased {
146-
event_sender
147-
.send(Event::PeerDownloadCompleted {
148-
info_hash: self.info_hash,
149-
peer: *new_announce.clone(),
150-
})
151-
.await;
152-
}
153-
}
154-
}
155-
156-
async fn trigger_peer_added_event(&self, announcement: &Arc<PeerAnnouncement>) {
157-
if let Some(event_sender) = self.event_sender.as_deref() {
158-
event_sender
159-
.send(Event::PeerAdded {
160-
info_hash: self.info_hash,
161-
peer: *announcement.clone(),
162-
})
163-
.await;
164-
}
165-
}
166-
167-
pub async fn remove(&mut self, peer_to_remove: &Peer) -> Option<Arc<Peer>> {
168-
match self.peers.remove(&peer_to_remove.peer_addr) {
169-
Some(old_peer) => {
170-
// A peer has been removed from the swarm.
171-
172-
// Check if the peer was a seeder or a leecher.
173-
if old_peer.is_seeder() {
174-
self.metadata.complete -= 1;
175-
} else {
176-
self.metadata.incomplete -= 1;
177-
}
178-
179-
if let Some(event_sender) = self.event_sender.as_deref() {
180-
event_sender
181-
.send(Event::PeerRemoved {
182-
info_hash: self.info_hash,
183-
peer: *old_peer.clone(),
184-
})
185-
.await;
186-
}
187-
188-
Some(old_peer)
189-
}
190-
None => None,
191-
}
192-
}
193-
19476
pub async fn remove_inactive(&mut self, current_cutoff: DurationSinceUnixEpoch) -> usize {
195-
let mut number_of_peers_removed = 0;
196-
let mut removed_peers = Vec::new();
197-
198-
self.peers.retain(|_key, peer| {
199-
let is_active = peer::ReadInfo::get_updated(peer) > current_cutoff;
200-
201-
if !is_active {
202-
// Update the metadata when removing a peer.
203-
if peer.is_seeder() {
204-
self.metadata.complete -= 1;
205-
} else {
206-
self.metadata.incomplete -= 1;
207-
}
208-
209-
number_of_peers_removed += 1;
210-
211-
if let Some(_event_sender) = self.event_sender.as_deref() {
212-
// Events can not be trigger here because retain does not allow
213-
// async closures.
214-
removed_peers.push(*peer.clone());
215-
}
216-
}
77+
let peers_to_remove = self.inactive_peers(current_cutoff);
21778

218-
is_active
219-
});
220-
221-
if let Some(event_sender) = self.event_sender.as_deref() {
222-
for peer in &removed_peers {
223-
event_sender
224-
.send(Event::PeerRemoved {
225-
info_hash: self.info_hash,
226-
peer: *peer,
227-
})
228-
.await;
229-
}
79+
for peer_addr in &peers_to_remove {
80+
self.remove_peer(peer_addr).await;
23081
}
23182

232-
number_of_peers_removed
83+
peers_to_remove.len()
23384
}
23485

23586
#[must_use]
@@ -316,6 +167,57 @@ impl Swarm {
316167
!self.should_be_removed(policy)
317168
}
318169

170+
async fn upsert_peer(
171+
&mut self,
172+
incoming_announce: Arc<PeerAnnouncement>,
173+
downloads_increased: &mut bool,
174+
) -> Option<Arc<Peer>> {
175+
let announcement = incoming_announce.clone();
176+
177+
if let Some(previous_announce) = self.peers.insert(incoming_announce.peer_addr, incoming_announce) {
178+
*downloads_increased = self.update_metadata_on_update(&previous_announce, &announcement);
179+
180+
self.trigger_peer_updated_event(&previous_announce, &announcement).await;
181+
182+
if *downloads_increased {
183+
self.trigger_peer_download_completed_event(&announcement).await;
184+
}
185+
186+
Some(previous_announce)
187+
} else {
188+
*downloads_increased = false;
189+
190+
self.update_metadata_on_insert(&announcement);
191+
192+
self.trigger_peer_added_event(&announcement).await;
193+
194+
None
195+
}
196+
}
197+
198+
async fn remove_peer(&mut self, peer_addr: &SocketAddr) -> Option<Arc<Peer>> {
199+
if let Some(old_peer) = self.peers.remove(peer_addr) {
200+
self.update_metadata_on_removal(&old_peer);
201+
202+
self.trigger_peer_removed_event(&old_peer).await;
203+
204+
Some(old_peer)
205+
} else {
206+
None
207+
}
208+
}
209+
210+
#[must_use]
211+
fn inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> Vec<SocketAddr> {
212+
self.peers
213+
.iter()
214+
.filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff)
215+
.map(|(addr, _)| *addr)
216+
.collect()
217+
}
218+
219+
/// Returns true if the swarm should be removed according to the retention
220+
/// policy.
319221
fn should_be_removed(&self, policy: &TrackerPolicy) -> bool {
320222
// If the policy is to remove peerless torrents and the swarm is empty (no peers),
321223
(policy.remove_peerless_torrents && self.is_empty())
@@ -325,6 +227,92 @@ impl Swarm {
325227
// See https://github.com/torrust/torrust-tracker/issues/1502)
326228
&& !(policy.persistent_torrent_completed_stat && self.metadata().downloaded > 0)
327229
}
230+
231+
fn update_metadata_on_insert(&mut self, added_peer: &Arc<PeerAnnouncement>) {
232+
if added_peer.is_seeder() {
233+
self.metadata.complete += 1;
234+
} else {
235+
self.metadata.incomplete += 1;
236+
}
237+
}
238+
239+
fn update_metadata_on_removal(&mut self, removed_peer: &Arc<Peer>) {
240+
if removed_peer.is_seeder() {
241+
self.metadata.complete -= 1;
242+
} else {
243+
self.metadata.incomplete -= 1;
244+
}
245+
}
246+
247+
fn update_metadata_on_update(
248+
&mut self,
249+
previous_announce: &Arc<PeerAnnouncement>,
250+
new_announce: &Arc<PeerAnnouncement>,
251+
) -> bool {
252+
let mut downloads_increased = false;
253+
254+
if previous_announce.role() != new_announce.role() {
255+
if new_announce.is_seeder() {
256+
self.metadata.complete += 1;
257+
self.metadata.incomplete -= 1;
258+
} else {
259+
self.metadata.complete -= 1;
260+
self.metadata.incomplete += 1;
261+
}
262+
}
263+
264+
if new_announce.is_completed() && !previous_announce.is_completed() {
265+
self.metadata.downloaded += 1;
266+
downloads_increased = true;
267+
}
268+
269+
downloads_increased
270+
}
271+
272+
async fn trigger_peer_added_event(&self, announcement: &Arc<PeerAnnouncement>) {
273+
if let Some(event_sender) = self.event_sender.as_deref() {
274+
event_sender
275+
.send(Event::PeerAdded {
276+
info_hash: self.info_hash,
277+
peer: *announcement.clone(),
278+
})
279+
.await;
280+
}
281+
}
282+
283+
async fn trigger_peer_removed_event(&self, old_peer: &Arc<Peer>) {
284+
if let Some(event_sender) = self.event_sender.as_deref() {
285+
event_sender
286+
.send(Event::PeerRemoved {
287+
info_hash: self.info_hash,
288+
peer: *old_peer.clone(),
289+
})
290+
.await;
291+
}
292+
}
293+
294+
async fn trigger_peer_updated_event(&self, old_announce: &Arc<PeerAnnouncement>, new_announce: &Arc<PeerAnnouncement>) {
295+
if let Some(event_sender) = self.event_sender.as_deref() {
296+
event_sender
297+
.send(Event::PeerUpdated {
298+
info_hash: self.info_hash,
299+
old_peer: *old_announce.clone(),
300+
new_peer: *new_announce.clone(),
301+
})
302+
.await;
303+
}
304+
}
305+
306+
async fn trigger_peer_download_completed_event(&self, new_announce: &Arc<PeerAnnouncement>) {
307+
if let Some(event_sender) = self.event_sender.as_deref() {
308+
event_sender
309+
.send(Event::PeerDownloadCompleted {
310+
info_hash: self.info_hash,
311+
peer: *new_announce.clone(),
312+
})
313+
.await;
314+
}
315+
}
328316
}
329317

330318
#[cfg(test)]
@@ -435,7 +423,7 @@ mod tests {
435423

436424
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
437425

438-
swarm.remove(&peer).await;
426+
swarm.remove_peer(&peer.peer_addr).await;
439427

440428
assert!(swarm.is_empty());
441429
}
@@ -449,7 +437,7 @@ mod tests {
449437

450438
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
451439

452-
let old = swarm.remove(&peer).await;
440+
let old = swarm.remove_peer(&peer.peer_addr).await;
453441

454442
assert_eq!(old, Some(Arc::new(peer)));
455443
assert_eq!(swarm.get(&peer.peer_addr), None);
@@ -461,7 +449,7 @@ mod tests {
461449

462450
let peer = PeerBuilder::default().build();
463451

464-
assert_eq!(swarm.remove(&peer).await, None);
452+
assert_eq!(swarm.remove_peer(&peer.peer_addr).await, None);
465453
}
466454

467455
#[tokio::test]
@@ -787,7 +775,7 @@ mod tests {
787775

788776
let leechers = swarm.metadata().leechers();
789777

790-
swarm.remove(&leecher).await;
778+
swarm.remove_peer(&leecher.peer_addr).await;
791779

792780
assert_eq!(swarm.metadata().leechers(), leechers - 1);
793781
}
@@ -803,7 +791,7 @@ mod tests {
803791

804792
let seeders = swarm.metadata().seeders();
805793

806-
swarm.remove(&seeder).await;
794+
swarm.remove_peer(&seeder.peer_addr).await;
807795

808796
assert_eq!(swarm.metadata().seeders(), seeders - 1);
809797
}
@@ -983,7 +971,7 @@ mod tests {
983971
let mut downloads_increased = false;
984972
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
985973

986-
swarm.remove(&peer).await;
974+
swarm.remove_peer(&peer.peer_addr).await;
987975
}
988976

989977
#[tokio::test]

0 commit comments

Comments
 (0)