Skip to content

Commit b3b0b71

Browse files
committed
refactor: [#1358] Swarm, cleaning upsert_peer method
1 parent 47d1eab commit b3b0b71

File tree

2 files changed

+67
-49
lines changed

2 files changed

+67
-49
lines changed

packages/primitives/src/peer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ impl Peer {
270270
!self.is_seeder()
271271
}
272272

273+
#[must_use]
274+
pub fn is_completed(&self) -> bool {
275+
self.event == AnnounceEvent::Completed
276+
}
277+
273278
#[must_use]
274279
pub fn role(&self) -> PeerRole {
275280
if self.is_seeder() {

packages/torrent-repository/src/swarm.rs

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,39 @@ impl Swarm {
7373
downloads_increased
7474
}
7575

76-
pub async fn upsert_peer(
76+
async fn upsert_peer(
7777
&mut self,
7878
incoming_announce: Arc<PeerAnnouncement>,
7979
downloads_increased: &mut bool,
8080
) -> Option<Arc<Peer>> {
81-
let is_now_seeder = incoming_announce.is_seeder();
82-
let has_completed = incoming_announce.event == AnnounceEvent::Completed;
8381
let announcement = incoming_announce.clone();
8482

85-
if let Some(old_announce) = self.peers.insert(incoming_announce.peer_addr, incoming_announce) {
86-
// A peer has been updated in the swarm.
83+
if let Some(previous_announce) = self.peers.insert(incoming_announce.peer_addr, incoming_announce) {
84+
*downloads_increased = self.update_metadata(Some(&previous_announce), &announcement);
8785

88-
// Check if the peer has changed from leecher to seeder or vice versa.
89-
if old_announce.is_seeder() != is_now_seeder {
90-
if is_now_seeder {
86+
self.trigger_peer_updated_event(&previous_announce, &announcement, *downloads_increased)
87+
.await;
88+
89+
Some(previous_announce)
90+
} else {
91+
*downloads_increased = self.update_metadata(None, &announcement);
92+
93+
self.trigger_peer_added_event(&announcement).await;
94+
95+
None
96+
}
97+
}
98+
99+
fn update_metadata(
100+
&mut self,
101+
opt_previous_announce: Option<&Arc<PeerAnnouncement>>,
102+
new_announce: &Arc<PeerAnnouncement>,
103+
) -> bool {
104+
let mut downloads_increased = false;
105+
106+
if let Some(previous_announce) = opt_previous_announce {
107+
if previous_announce.role() != new_announce.role() {
108+
if new_announce.is_seeder() {
91109
self.metadata.complete += 1;
92110
self.metadata.incomplete -= 1;
93111
} else {
@@ -96,58 +114,53 @@ impl Swarm {
96114
}
97115
}
98116

99-
// Check if the peer has completed downloading the torrent.
100-
if has_completed && old_announce.event != AnnounceEvent::Completed {
117+
if new_announce.is_completed() && !previous_announce.is_completed() {
101118
self.metadata.downloaded += 1;
102-
*downloads_increased = true;
119+
downloads_increased = true;
103120
}
104-
105-
if let Some(event_sender) = self.event_sender.as_deref() {
106-
event_sender
107-
.send(Event::PeerUpdated {
108-
info_hash: self.info_hash,
109-
old_peer: *old_announce,
110-
new_peer: *announcement,
111-
})
112-
.await;
113-
114-
if *downloads_increased {
115-
event_sender
116-
.send(Event::PeerDownloadCompleted {
117-
info_hash: self.info_hash,
118-
peer: *announcement,
119-
})
120-
.await;
121-
}
122-
}
123-
124-
Some(old_announce)
121+
} else if new_announce.is_seeder() {
122+
self.metadata.complete += 1;
125123
} else {
126-
// A new peer has been added to the swarm.
127-
128-
// Check if the peer is a seeder or a leecher.
129-
if is_now_seeder {
130-
self.metadata.complete += 1;
131-
} else {
132-
self.metadata.incomplete += 1;
133-
}
124+
self.metadata.incomplete += 1;
125+
}
134126

135-
// Check if the peer has completed downloading the torrent.
136-
if has_completed {
137-
// Don't increment `downloaded` here: we only count transitions
138-
// from a known peer
139-
}
127+
downloads_increased
128+
}
140129

141-
if let Some(event_sender) = self.event_sender.as_deref() {
130+
async fn trigger_peer_updated_event(
131+
&self,
132+
old_announce: &Arc<PeerAnnouncement>,
133+
new_announce: &Arc<PeerAnnouncement>,
134+
downloads_increased: bool,
135+
) {
136+
if let Some(event_sender) = self.event_sender.as_deref() {
137+
event_sender
138+
.send(Event::PeerUpdated {
139+
info_hash: self.info_hash,
140+
old_peer: *old_announce.clone(),
141+
new_peer: *new_announce.clone(),
142+
})
143+
.await;
144+
145+
if downloads_increased {
142146
event_sender
143-
.send(Event::PeerAdded {
147+
.send(Event::PeerDownloadCompleted {
144148
info_hash: self.info_hash,
145-
peer: *announcement,
149+
peer: *new_announce.clone(),
146150
})
147151
.await;
148152
}
153+
}
154+
}
149155

150-
None
156+
async fn trigger_peer_added_event(&self, announcement: &Arc<PeerAnnouncement>) {
157+
if let Some(event_sender) = self.event_sender.as_deref() {
158+
event_sender
159+
.send(Event::PeerAdded {
160+
info_hash: self.info_hash,
161+
peer: *announcement.clone(),
162+
})
163+
.await;
151164
}
152165
}
153166

0 commit comments

Comments
 (0)