Skip to content

Commit bf564a4

Browse files
authored
Improve timeout logic for Telemetry (#7607)
* Improve timeout logic for Telemetry * assert! * fix misuses of StoppinGuard * decrease timeout for each shard
1 parent 3daaba2 commit bf564a4

18 files changed

Lines changed: 170 additions & 106 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/collection/src/collection/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -783,17 +783,24 @@ impl Collection {
783783
Ok(())
784784
}
785785

786-
pub async fn get_aggregated_telemetry_data(&self) -> CollectionsAggregatedTelemetry {
786+
pub async fn get_aggregated_telemetry_data(
787+
&self,
788+
timeout: Duration,
789+
) -> CollectionResult<CollectionsAggregatedTelemetry> {
790+
let start = std::time::Instant::now();
787791
let shards_holder = self.shards_holder.read().await;
788792

789793
let mut shard_optimization_statuses = Vec::new();
790794
let mut vectors = 0;
791795

792796
for shard in shards_holder.all_shards() {
793-
let shard_optimization_status = shard
794-
.get_optimization_status()
797+
let shard_optimization_status = match shard
798+
.get_optimization_status(timeout.saturating_sub(start.elapsed()))
795799
.await
796-
.unwrap_or(OptimizersStatus::Ok);
800+
{
801+
None => OptimizersStatus::Ok,
802+
Some(status) => status?,
803+
};
797804

798805
shard_optimization_statuses.push(shard_optimization_status);
799806

@@ -805,11 +812,11 @@ impl Collection {
805812
.max()
806813
.unwrap_or(OptimizersStatus::Ok);
807814

808-
CollectionsAggregatedTelemetry {
815+
Ok(CollectionsAggregatedTelemetry {
809816
vectors,
810817
optimizers_status,
811818
params: self.collection_config.read().await.params.clone(),
812-
}
819+
})
813820
}
814821

815822
pub async fn effective_optimizers_config(&self) -> CollectionResult<OptimizersConfig> {

lib/collection/src/collection/telemetry.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::time::Duration;
22

33
use common::types::{DetailsLevel, TelemetryDetail};
4-
use shard::common::stopping_guard::StoppingGuard;
54

65
use crate::collection::Collection;
76
use crate::operations::types::CollectionResult;
@@ -12,18 +11,13 @@ impl Collection {
1211
&self,
1312
detail: TelemetryDetail,
1413
timeout: Duration,
15-
is_stopped_guard: &StoppingGuard,
1614
) -> CollectionResult<CollectionTelemetry> {
1715
let (shards_telemetry, transfers, resharding) = {
1816
if detail.level >= DetailsLevel::Level3 {
1917
let shards_holder = self.shards_holder.read().await;
2018
let mut shards_telemetry = Vec::new();
2119
for shard in shards_holder.all_shards() {
22-
shards_telemetry.push(
23-
shard
24-
.get_telemetry_data(detail, timeout, is_stopped_guard)
25-
.await?,
26-
)
20+
shards_telemetry.push(shard.get_telemetry_data(detail, timeout).await?)
2721
}
2822
(
2923
Some(shards_telemetry),

lib/collection/src/shards/forward_proxy_shard.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use segment::types::{
1414
ExtendedPointId, Filter, PointIdType, ScoredPoint, SizeStats, SnapshotFormat, WithPayload,
1515
WithPayloadInterface, WithVector,
1616
};
17-
use shard::common::stopping_guard::StoppingGuard;
1817
use shard::retrieve::record_internal::RecordInternal;
1918
use shard::search::CoreSearchRequestBatch;
2019
use tokio::runtime::Handle;
@@ -361,15 +360,15 @@ impl ForwardProxyShard {
361360
&self,
362361
detail: TelemetryDetail,
363362
timeout: Duration,
364-
is_stopped_guard: &StoppingGuard,
365363
) -> CollectionResult<LocalShardTelemetry> {
366-
self.wrapped_shard
367-
.get_telemetry_data(detail, timeout, is_stopped_guard)
368-
.await
364+
self.wrapped_shard.get_telemetry_data(detail, timeout).await
369365
}
370366

371-
pub async fn get_optimization_status(&self) -> OptimizersStatus {
372-
self.wrapped_shard.get_optimization_status().await
367+
pub async fn get_optimization_status(
368+
&self,
369+
timeout: Duration,
370+
) -> CollectionResult<OptimizersStatus> {
371+
self.wrapped_shard.get_optimization_status(timeout).await
373372
}
374373

375374
pub async fn get_size_stats(&self) -> SizeStats {

lib/collection/src/shards/local_shard/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl LocalShard {
269269
}
270270
}
271271

272-
pub(super) fn segments(&self) -> &RwLock<SegmentHolder> {
272+
pub fn segments(&self) -> &RwLock<SegmentHolder> {
273273
self.segments.deref()
274274
}
275275

lib/collection/src/shards/local_shard/telemetry.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ impl LocalShard {
2222
&self,
2323
detail: TelemetryDetail,
2424
timeout: Duration,
25-
is_stopped_guard: &StoppingGuard,
2625
) -> CollectionResult<LocalShardTelemetry> {
26+
let start = std::time::Instant::now();
2727
let segments = self.segments.clone();
28-
2928
let segments_data = if detail.level < DetailsLevel::Level4 {
3029
Ok((vec![], HashMap::default()))
3130
} else {
3231
let locked_collection_config = self.collection_config.clone();
33-
let is_stopped_guard = is_stopped_guard.clone();
32+
let is_stopped_guard = StoppingGuard::new();
33+
let is_stopped = is_stopped_guard.get_is_stopped();
3434
let handle = tokio::task::spawn_blocking(move || {
3535
// blocking sync lock
3636
let Some(segments_guard) = segments.try_read_for(timeout) else {
@@ -39,10 +39,9 @@ impl LocalShard {
3939
"shard telemetry",
4040
));
4141
};
42-
4342
let mut segments_telemetry = Vec::with_capacity(segments_guard.len());
4443
for (_id, segment) in segments_guard.iter() {
45-
if is_stopped_guard.is_stopped() {
44+
if is_stopped.load(Ordering::Relaxed) {
4645
return Ok((vec![], HashMap::default()));
4746
}
4847

@@ -67,7 +66,6 @@ impl LocalShard {
6766
};
6867

6968
let (segments, index_only_excluded_vectors) = segments_data?;
70-
7169
let total_optimized_points = self.total_optimized_points.load(Ordering::Relaxed);
7270

7371
let optimizations: OperationDurationStatistics = self
@@ -81,7 +79,9 @@ impl LocalShard {
8179
})
8280
.fold(Default::default(), |total, stats| total + stats);
8381

84-
let status = self.get_optimization_status().await;
82+
// update timeout
83+
let timeout = timeout.saturating_sub(start.elapsed());
84+
let status = self.get_optimization_status(timeout).await?;
8585

8686
let SizeStats {
8787
num_vectors,
@@ -117,23 +117,27 @@ impl LocalShard {
117117
})
118118
}
119119

120-
pub async fn get_optimization_status(&self) -> OptimizersStatus {
120+
pub async fn get_optimization_status(
121+
&self,
122+
timeout: Duration,
123+
) -> CollectionResult<OptimizersStatus> {
121124
let segments = self.segments.clone();
122125

123126
let status = tokio::task::spawn_blocking(move || {
124-
let segments = segments.read();
127+
// blocking sync lock
128+
let Some(segments) = segments.try_read_for(timeout) else {
129+
return Err(CollectionError::timeout(
130+
timeout.as_secs() as usize,
131+
"optimization status",
132+
));
133+
};
125134

126135
match &segments.optimizer_errors {
127-
None => OptimizersStatus::Ok,
128-
Some(err) => OptimizersStatus::Error(err.clone()),
136+
None => Ok(OptimizersStatus::Ok),
137+
Some(err) => Ok(OptimizersStatus::Error(err.clone())),
129138
}
130139
});
131-
let status = AbortOnDropHandle::new(status).await;
132-
133-
match status {
134-
Ok(status) => status,
135-
Err(err) => OptimizersStatus::Error(format!("failed to get optimizers status: {err}")),
136-
}
140+
AbortOnDropHandle::new(status).await?
137141
}
138142

139143
pub async fn get_size_stats(&self) -> SizeStats {

lib/collection/src/shards/proxy_shard.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use segment::types::{
1515
ExtendedPointId, Filter, PointIdType, ScoredPoint, SizeStats, SnapshotFormat, WithPayload,
1616
WithPayloadInterface, WithVector,
1717
};
18-
use shard::common::stopping_guard::StoppingGuard;
1918
use shard::retrieve::record_internal::RecordInternal;
2019
use shard::search::CoreSearchRequestBatch;
2120
use tokio::runtime::Handle;
@@ -149,15 +148,15 @@ impl ProxyShard {
149148
&self,
150149
detail: TelemetryDetail,
151150
timeout: Duration,
152-
is_stopped_guard: &StoppingGuard,
153151
) -> CollectionResult<LocalShardTelemetry> {
154-
self.wrapped_shard
155-
.get_telemetry_data(detail, timeout, is_stopped_guard)
156-
.await
152+
self.wrapped_shard.get_telemetry_data(detail, timeout).await
157153
}
158154

159-
pub async fn get_optimization_status(&self) -> OptimizersStatus {
160-
self.wrapped_shard.get_optimization_status().await
155+
pub async fn get_optimization_status(
156+
&self,
157+
timeout: Duration,
158+
) -> CollectionResult<OptimizersStatus> {
159+
self.wrapped_shard.get_optimization_status(timeout).await
161160
}
162161

163162
pub async fn get_size_stats(&self) -> SizeStats {

lib/collection/src/shards/queue_proxy_shard.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use segment::types::{
1616
WithPayloadInterface, WithVector,
1717
};
1818
use semver::Version;
19-
use shard::common::stopping_guard::StoppingGuard;
2019
use shard::retrieve::record_internal::RecordInternal;
2120
use shard::search::CoreSearchRequestBatch;
2221
use tokio::runtime::Handle;
@@ -193,18 +192,20 @@ impl QueueProxyShard {
193192
&self,
194193
detail: TelemetryDetail,
195194
timeout: Duration,
196-
is_stopped_guard: &StoppingGuard,
197195
) -> CollectionResult<LocalShardTelemetry> {
198196
self.inner_unchecked()
199197
.wrapped_shard
200-
.get_telemetry_data(detail, timeout, is_stopped_guard)
198+
.get_telemetry_data(detail, timeout)
201199
.await
202200
}
203201

204-
pub async fn get_optimization_status(&self) -> OptimizersStatus {
202+
pub async fn get_optimization_status(
203+
&self,
204+
timeout: Duration,
205+
) -> CollectionResult<OptimizersStatus> {
205206
self.inner_unchecked()
206207
.wrapped_shard
207-
.get_optimization_status()
208+
.get_optimization_status(timeout)
208209
.await
209210
}
210211

lib/collection/src/shards/replica_set/telemetry.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::time::Duration;
33

44
use common::types::TelemetryDetail;
55
use segment::types::SizeStats;
6-
use shard::common::stopping_guard::StoppingGuard;
76

87
use crate::operations::types::{CollectionResult, OptimizersStatus};
98
use crate::shards::replica_set::ShardReplicaSet;
@@ -14,17 +13,12 @@ impl ShardReplicaSet {
1413
&self,
1514
detail: TelemetryDetail,
1615
timeout: Duration,
17-
is_stopped_guard: &StoppingGuard,
1816
) -> CollectionResult<ReplicaSetTelemetry> {
1917
let local_shard = self.local.read().await;
2018
let local = local_shard.as_ref();
2119

2220
let local_telemetry = match local {
23-
Some(local_shard) => Some(
24-
local_shard
25-
.get_telemetry_data(detail, timeout, is_stopped_guard)
26-
.await?,
27-
),
21+
Some(local_shard) => Some(local_shard.get_telemetry_data(detail, timeout).await?),
2822
None => None,
2923
};
3024

@@ -50,14 +44,17 @@ impl ShardReplicaSet {
5044
})
5145
}
5246

53-
pub(crate) async fn get_optimization_status(&self) -> Option<OptimizersStatus> {
47+
pub(crate) async fn get_optimization_status(
48+
&self,
49+
timeout: Duration,
50+
) -> Option<CollectionResult<OptimizersStatus>> {
5451
let local_shard = self.local.read().await;
5552

5653
let Some(local) = local_shard.deref() else {
5754
return None;
5855
};
5956

60-
Some(local.get_optimization_status().await)
57+
Some(local.get_optimization_status(timeout).await)
6158
}
6259

6360
pub(crate) async fn get_size_stats(&self) -> SizeStats {

lib/collection/src/shards/shard.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use common::types::TelemetryDetail;
99
use segment::data_types::manifest::SnapshotManifest;
1010
use segment::index::field_index::CardinalityEstimation;
1111
use segment::types::{Filter, SizeStats, SnapshotFormat};
12-
use shard::common::stopping_guard::StoppingGuard;
1312

1413
use super::local_shard::clock_map::RecoveryPoint;
1514
use super::update_tracker::UpdateTracker;
@@ -76,49 +75,41 @@ impl Shard {
7675
&self,
7776
detail: TelemetryDetail,
7877
timeout: Duration,
79-
is_stopped_guard: &StoppingGuard,
8078
) -> CollectionResult<LocalShardTelemetry> {
8179
let mut telemetry = match self {
8280
Shard::Local(local_shard) => {
83-
let mut shard_telemetry = local_shard
84-
.get_telemetry_data(detail, timeout, is_stopped_guard)
85-
.await?;
81+
let mut shard_telemetry = local_shard.get_telemetry_data(detail, timeout).await?;
8682

8783
// can't take sync locks in async fn so local_shard_status() has to be
8884
// called outside get_telemetry_data()
8985
shard_telemetry.status = Some(local_shard.local_shard_status().await.0);
9086
shard_telemetry
9187
}
92-
Shard::Proxy(proxy_shard) => {
93-
proxy_shard
94-
.get_telemetry_data(detail, timeout, is_stopped_guard)
95-
.await?
96-
}
88+
Shard::Proxy(proxy_shard) => proxy_shard.get_telemetry_data(detail, timeout).await?,
9789
Shard::ForwardProxy(proxy_shard) => {
98-
proxy_shard
99-
.get_telemetry_data(detail, timeout, is_stopped_guard)
100-
.await?
90+
proxy_shard.get_telemetry_data(detail, timeout).await?
10191
}
10292
Shard::QueueProxy(proxy_shard) => {
103-
proxy_shard
104-
.get_telemetry_data(detail, timeout, is_stopped_guard)
105-
.await?
93+
proxy_shard.get_telemetry_data(detail, timeout).await?
10694
}
10795
Shard::Dummy(dummy_shard) => dummy_shard.get_telemetry_data(),
10896
};
10997
telemetry.variant_name = Some(self.variant_name().to_string());
11098
Ok(telemetry)
11199
}
112100

113-
pub async fn get_optimization_status(&self) -> OptimizersStatus {
101+
pub async fn get_optimization_status(
102+
&self,
103+
timeout: Duration,
104+
) -> CollectionResult<OptimizersStatus> {
114105
match self {
115-
Shard::Local(local_shard) => local_shard.get_optimization_status().await,
116-
Shard::Proxy(proxy_shard) => proxy_shard.get_optimization_status().await,
117-
Shard::ForwardProxy(proxy_shard) => proxy_shard.get_optimization_status().await,
106+
Shard::Local(local_shard) => local_shard.get_optimization_status(timeout).await,
107+
Shard::Proxy(proxy_shard) => proxy_shard.get_optimization_status(timeout).await,
108+
Shard::ForwardProxy(proxy_shard) => proxy_shard.get_optimization_status(timeout).await,
118109
Shard::QueueProxy(queue_proxy_shard) => {
119-
queue_proxy_shard.get_optimization_status().await
110+
queue_proxy_shard.get_optimization_status(timeout).await
120111
}
121-
Shard::Dummy(dummy_shard) => dummy_shard.get_optimization_status(),
112+
Shard::Dummy(dummy_shard) => Ok(dummy_shard.get_optimization_status()),
122113
}
123114
}
124115

0 commit comments

Comments
 (0)