Skip to content

Commit 80fbfe0

Browse files
timviseecoszio
authored andcommitted
Tweak transfer selection
1 parent ace660e commit 80fbfe0

1 file changed

Lines changed: 22 additions & 11 deletions

File tree

src/common/telemetry_ops/distributed_telemetry.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![expect(dead_code)]
22

3-
use ahash::HashMap;
3+
use ahash::{HashMap, HashMapExt};
44
use collection::operations::types::{ReshardingInfo, ShardTransferInfo};
55
use schemars::JsonSchema;
66
use serde::Serialize;
@@ -76,13 +76,11 @@ pub struct DistributedPeerInfo {
7676

7777
impl DistributedTelemetryData {
7878
pub fn resolve_telemetries(
79-
mut telemetries: Vec<TelemetryData>,
79+
telemetries: Vec<TelemetryData>,
8080
missing_peers: Vec<PeerId>,
8181
) -> StorageResult<Self> {
8282
// Use the telemetry from node with highest term/commit
83-
sort_telemetries_by_term_and_commit(&mut telemetries);
84-
let base_telemetry = telemetries
85-
.first()
83+
let base_telemetry = newest_telemetry(&telemetries)
8684
.ok_or_else(|| StorageError::service_error("Could not get telemetry from cluster"))?;
8785

8886
// Create a map of peer_id -> TelemetryData for quick lookup
@@ -115,8 +113,8 @@ fn aggregate_collections(
115113
telemetry_by_peer: &HashMap<PeerId, &TelemetryData>,
116114
base_telemetry: &TelemetryData,
117115
) -> Option<HashMap<String, DistributedCollectionTelemetry>> {
118-
let mut collections = HashMap::default();
119116
let base_collections = base_telemetry.collections.collections.as_ref()?;
117+
let mut collections = HashMap::with_capacity(base_collections.len());
120118
let base_peer_id = base_telemetry.cluster.as_ref()?.status.as_ref()?.peer_id?;
121119

122120
for collection_enum in base_collections {
@@ -167,10 +165,23 @@ fn aggregate_shard_transfers(
167165
})
168166
};
169167

170-
let get_transfer_from_source = |from, to, shard_id| {
171-
get_transfers(from)?
172-
.iter()
173-
.find(|t| t.from == from && t.to == to && t.shard_id == shard_id)
168+
let get_transfer_from_source = |base_transfer: &ShardTransferInfo| {
169+
let ShardTransferInfo {
170+
shard_id,
171+
to_shard_id,
172+
from,
173+
to,
174+
sync: _,
175+
method: _,
176+
comment: _,
177+
} = base_transfer;
178+
179+
get_transfers(*from)?.iter().find(|t| {
180+
t.from == *from
181+
&& t.to == *to
182+
&& t.shard_id == *shard_id
183+
&& t.to_shard_id == *to_shard_id
184+
})
174185
};
175186

176187
let Some(base_transfers) = get_transfers(base_peer_id) else {
@@ -182,7 +193,7 @@ fn aggregate_shard_transfers(
182193
base_transfers
183194
.iter()
184195
.map(|base_transfer| {
185-
get_transfer_from_source(base_transfer.from, base_transfer.to, base_transfer.shard_id)
196+
get_transfer_from_source(base_transfer)
186197
.cloned()
187198
.unwrap_or_else(|| base_transfer.clone())
188199
})

0 commit comments

Comments
 (0)