Skip to content

Commit f3a8a3e

Browse files
committed
Add timeout to cancel metrics and telemetry calls
1 parent 1a92c69 commit f3a8a3e

18 files changed

Lines changed: 333 additions & 102 deletions

File tree

docs/redoc/master/openapi.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,17 @@
244244
"type": "integer",
245245
"minimum": 0
246246
}
247+
},
248+
{
249+
"name": "timeout",
250+
"in": "query",
251+
"description": "Timeout for this request",
252+
"required": false,
253+
"schema": {
254+
"type": "integer",
255+
"minimum": 1,
256+
"maximum": 10
257+
}
247258
}
248259
],
249260
"responses": {
@@ -323,6 +334,17 @@
323334
"schema": {
324335
"type": "boolean"
325336
}
337+
},
338+
{
339+
"name": "timeout",
340+
"in": "query",
341+
"description": "Timeout for this request",
342+
"required": false,
343+
"schema": {
344+
"type": "integer",
345+
"minimum": 1,
346+
"maximum": 10
347+
}
326348
}
327349
],
328350
"responses": {

lib/collection/src/collection/telemetry.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,29 @@
1+
use std::time::Duration;
2+
13
use common::types::{DetailsLevel, TelemetryDetail};
4+
use shard::common::stopping_guard::StoppingGuard;
25

36
use crate::collection::Collection;
7+
use crate::operations::types::CollectionResult;
48
use crate::telemetry::{CollectionConfigTelemetry, CollectionTelemetry};
59

610
impl Collection {
7-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> CollectionTelemetry {
11+
pub async fn get_telemetry_data(
12+
&self,
13+
detail: TelemetryDetail,
14+
timeout: Duration,
15+
is_stopped_guard: &StoppingGuard,
16+
) -> CollectionResult<CollectionTelemetry> {
817
let (shards_telemetry, transfers, resharding) = {
918
if detail.level >= DetailsLevel::Level3 {
1019
let shards_holder = self.shards_holder.read().await;
1120
let mut shards_telemetry = Vec::new();
1221
for shard in shards_holder.all_shards() {
13-
shards_telemetry.push(shard.get_telemetry_data(detail).await)
22+
shards_telemetry.push(
23+
shard
24+
.get_telemetry_data(detail, timeout, is_stopped_guard)
25+
.await?,
26+
)
1427
}
1528
(
1629
Some(shards_telemetry),
@@ -28,14 +41,14 @@ impl Collection {
2841

2942
let shard_clean_tasks = self.clean_local_shards_statuses();
3043

31-
CollectionTelemetry {
44+
Ok(CollectionTelemetry {
3245
id: self.name().to_string(),
3346
init_time_ms: self.init_time.as_millis() as u64,
3447
config: CollectionConfigTelemetry::from(self.collection_config.read().await.clone()),
3548
shards: shards_telemetry,
3649
transfers,
3750
resharding,
3851
shard_clean_tasks: (!shard_clean_tasks.is_empty()).then_some(shard_clean_tasks),
39-
}
52+
})
4053
}
4154
}

lib/collection/src/shards/forward_proxy_shard.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use segment::types::{
1414
ExtendedPointId, Filter, PointIdType, ScoredPoint, SizeStats, SnapshotFormat, WithPayload,
1515
WithPayloadInterface, WithVector,
1616
};
17+
use shard::common::stopping_guard::StoppingGuard;
1718
use shard::retrieve::record_internal::RecordInternal;
1819
use shard::search::CoreSearchRequestBatch;
1920
use tokio::runtime::Handle;
@@ -356,8 +357,15 @@ impl ForwardProxyShard {
356357
self.wrapped_shard.trigger_optimizers();
357358
}
358359

359-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
360-
self.wrapped_shard.get_telemetry_data(detail).await
360+
pub async fn get_telemetry_data(
361+
&self,
362+
detail: TelemetryDetail,
363+
timeout: Duration,
364+
is_stopped_guard: &StoppingGuard,
365+
) -> CollectionResult<LocalShardTelemetry> {
366+
self.wrapped_shard
367+
.get_telemetry_data(detail, timeout, is_stopped_guard)
368+
.await
361369
}
362370

363371
pub async fn get_optimization_status(&self) -> OptimizersStatus {

lib/collection/src/shards/local_shard/telemetry.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,72 @@
11
use std::collections::HashMap;
22
use std::sync::atomic::Ordering;
3+
use std::time::Duration;
34

45
use common::types::{DetailsLevel, TelemetryDetail};
56
use segment::common::BYTES_IN_KB;
67
use segment::common::operation_time_statistics::OperationDurationStatistics;
78
use segment::types::{SizeStats, VectorNameBuf};
89
use segment::vector_storage::common::get_async_scorer;
10+
use shard::common::stopping_guard::StoppingGuard;
911
use shard::segment_holder::SegmentHolder;
1012
use tokio_util::task::AbortOnDropHandle;
1113

1214
use crate::config::CollectionConfigInternal;
13-
use crate::operations::types::OptimizersStatus;
15+
use crate::operations::types::{CollectionError, CollectionResult, OptimizersStatus};
1416
use crate::optimizers_builder::DEFAULT_INDEXING_THRESHOLD_KB;
1517
use crate::shards::local_shard::LocalShard;
1618
use crate::shards::telemetry::{LocalShardTelemetry, OptimizerTelemetry};
1719

1820
impl LocalShard {
19-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
21+
pub async fn get_telemetry_data(
22+
&self,
23+
detail: TelemetryDetail,
24+
timeout: Duration,
25+
is_stopped_guard: &StoppingGuard,
26+
) -> CollectionResult<LocalShardTelemetry> {
2027
let segments = self.segments.clone();
2128

2229
let segments_data = if detail.level < DetailsLevel::Level4 {
2330
Ok((vec![], HashMap::default()))
2431
} else {
2532
let locked_collection_config = self.collection_config.clone();
26-
33+
let is_stopped_guard = is_stopped_guard.clone();
2734
let handle = tokio::task::spawn_blocking(move || {
2835
// blocking sync lock
29-
let segments_guard = segments.read();
36+
let Some(segments_guard) = segments.try_read_for(timeout) else {
37+
return Err(CollectionError::timeout(
38+
timeout.as_secs() as usize,
39+
"shard telemetry",
40+
));
41+
};
42+
43+
let mut segments_telemetry = Vec::with_capacity(segments_guard.len());
44+
for (_id, segment) in segments_guard.iter() {
45+
if is_stopped_guard.is_stopped() {
46+
return Ok((vec![], HashMap::default()));
47+
}
3048

31-
let segments_telemetry = segments_guard
32-
.iter()
33-
.map(|(_id, segment)| segment.get().read().get_telemetry_data(detail))
34-
.collect();
49+
// blocking sync lock
50+
let Some(segment_guard) = segment.get().try_read_for(timeout) else {
51+
return Err(CollectionError::timeout(
52+
timeout.as_secs() as usize,
53+
"shard telemetry",
54+
));
55+
};
56+
57+
segments_telemetry.push(segment_guard.get_telemetry_data(detail))
58+
}
3559

3660
let collection_config = locked_collection_config.blocking_read();
3761
let indexed_only_excluded_vectors =
3862
get_index_only_excluded_vectors(&segments_guard, &collection_config);
3963

40-
(segments_telemetry, indexed_only_excluded_vectors)
64+
Ok((segments_telemetry, indexed_only_excluded_vectors))
4165
});
42-
AbortOnDropHandle::new(handle).await
66+
AbortOnDropHandle::new(handle).await?
4367
};
4468

45-
if let Err(err) = &segments_data {
46-
log::error!("Failed to get telemetry: {err}");
47-
}
48-
49-
let (segments, index_only_excluded_vectors) = segments_data.unwrap_or_default();
69+
let (segments, index_only_excluded_vectors) = segments_data?;
5070

5171
let total_optimized_points = self.total_optimized_points.load(Ordering::Relaxed);
5272

@@ -71,7 +91,7 @@ impl LocalShard {
7191
num_points,
7292
} = self.get_size_stats().await;
7393

74-
LocalShardTelemetry {
94+
Ok(LocalShardTelemetry {
7595
variant_name: None,
7696
status: None,
7797
total_optimized_points,
@@ -94,7 +114,7 @@ impl LocalShard {
94114
async_scorer: Some(get_async_scorer()),
95115
indexed_only_excluded_vectors: (!index_only_excluded_vectors.is_empty())
96116
.then_some(index_only_excluded_vectors),
97-
}
117+
})
98118
}
99119

100120
pub async fn get_optimization_status(&self) -> OptimizersStatus {

lib/collection/src/shards/proxy_shard.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use segment::types::{
1515
ExtendedPointId, Filter, PointIdType, ScoredPoint, SizeStats, SnapshotFormat, WithPayload,
1616
WithPayloadInterface, WithVector,
1717
};
18+
use shard::common::stopping_guard::StoppingGuard;
1819
use shard::retrieve::record_internal::RecordInternal;
1920
use shard::search::CoreSearchRequestBatch;
2021
use tokio::runtime::Handle;
@@ -144,8 +145,15 @@ impl ProxyShard {
144145
Ok(())
145146
}
146147

147-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
148-
self.wrapped_shard.get_telemetry_data(detail).await
148+
pub async fn get_telemetry_data(
149+
&self,
150+
detail: TelemetryDetail,
151+
timeout: Duration,
152+
is_stopped_guard: &StoppingGuard,
153+
) -> CollectionResult<LocalShardTelemetry> {
154+
self.wrapped_shard
155+
.get_telemetry_data(detail, timeout, is_stopped_guard)
156+
.await
149157
}
150158

151159
pub async fn get_optimization_status(&self) -> OptimizersStatus {

lib/collection/src/shards/queue_proxy_shard.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use segment::types::{
1616
WithPayloadInterface, WithVector,
1717
};
1818
use semver::Version;
19+
use shard::common::stopping_guard::StoppingGuard;
1920
use shard::retrieve::record_internal::RecordInternal;
2021
use shard::search::CoreSearchRequestBatch;
2122
use tokio::runtime::Handle;
@@ -186,10 +187,15 @@ impl QueueProxyShard {
186187
self.inner_unchecked().wrapped_shard.trigger_optimizers();
187188
}
188189

189-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
190+
pub async fn get_telemetry_data(
191+
&self,
192+
detail: TelemetryDetail,
193+
timeout: Duration,
194+
is_stopped_guard: &StoppingGuard,
195+
) -> CollectionResult<LocalShardTelemetry> {
190196
self.inner_unchecked()
191197
.wrapped_shard
192-
.get_telemetry_data(detail)
198+
.get_telemetry_data(detail, timeout, is_stopped_guard)
193199
.await
194200
}
195201

lib/collection/src/shards/replica_set/telemetry.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
use std::ops::Deref as _;
2+
use std::time::Duration;
23

34
use common::types::TelemetryDetail;
45
use segment::types::SizeStats;
6+
use shard::common::stopping_guard::StoppingGuard;
57

6-
use crate::operations::types::OptimizersStatus;
8+
use crate::operations::types::{CollectionResult, OptimizersStatus};
79
use crate::shards::replica_set::ShardReplicaSet;
810
use crate::shards::telemetry::{PartialSnapshotTelemetry, ReplicaSetTelemetry};
911

1012
impl ShardReplicaSet {
11-
pub(crate) async fn get_telemetry_data(&self, detail: TelemetryDetail) -> ReplicaSetTelemetry {
13+
pub(crate) async fn get_telemetry_data(
14+
&self,
15+
detail: TelemetryDetail,
16+
timeout: Duration,
17+
is_stopped_guard: &StoppingGuard,
18+
) -> CollectionResult<ReplicaSetTelemetry> {
1219
let local_shard = self.local.read().await;
1320
let local = local_shard.as_ref();
1421

1522
let local_telemetry = match local {
16-
Some(local_shard) => Some(local_shard.get_telemetry_data(detail).await),
23+
Some(local_shard) => Some(
24+
local_shard
25+
.get_telemetry_data(detail, timeout, is_stopped_guard)
26+
.await?,
27+
),
1728
None => None,
1829
};
1930

20-
ReplicaSetTelemetry {
31+
Ok(ReplicaSetTelemetry {
2132
id: self.shard_id,
2233
key: self.shard_key.clone(),
2334
local: local_telemetry,
@@ -36,7 +47,7 @@ impl ShardReplicaSet {
3647
is_recovering: self.partial_snapshot_meta.is_recovery_lock_taken(),
3748
recovery_timestamp: self.partial_snapshot_meta.recovery_timestamp(),
3849
}),
39-
}
50+
})
4051
}
4152

4253
pub(crate) async fn get_optimization_status(&self) -> Option<OptimizersStatus> {

lib/collection/src/shards/shard.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use core::marker::{Send, Sync};
22
use std::future::{self, Future};
33
use std::path::Path;
4+
use std::time::Duration;
45

56
use common::counter::hardware_accumulator::HwMeasurementAcc;
67
use common::tar_ext;
78
use common::types::TelemetryDetail;
89
use segment::data_types::manifest::SnapshotManifest;
910
use segment::index::field_index::CardinalityEstimation;
1011
use segment::types::{Filter, SizeStats, SnapshotFormat};
12+
use shard::common::stopping_guard::StoppingGuard;
1113

1214
use super::local_shard::clock_map::RecoveryPoint;
1315
use super::update_tracker::UpdateTracker;
@@ -70,23 +72,42 @@ impl Shard {
7072
}
7173
}
7274

73-
pub async fn get_telemetry_data(&self, detail: TelemetryDetail) -> LocalShardTelemetry {
75+
pub async fn get_telemetry_data(
76+
&self,
77+
detail: TelemetryDetail,
78+
timeout: Duration,
79+
is_stopped_guard: &StoppingGuard,
80+
) -> CollectionResult<LocalShardTelemetry> {
7481
let mut telemetry = match self {
7582
Shard::Local(local_shard) => {
76-
let mut shard_telemetry = local_shard.get_telemetry_data(detail).await;
83+
let mut shard_telemetry = local_shard
84+
.get_telemetry_data(detail, timeout, is_stopped_guard)
85+
.await?;
7786

7887
// can't take sync locks in async fn so local_shard_status() has to be
7988
// called outside get_telemetry_data()
8089
shard_telemetry.status = Some(local_shard.local_shard_status().await.0);
8190
shard_telemetry
8291
}
83-
Shard::Proxy(proxy_shard) => proxy_shard.get_telemetry_data(detail).await,
84-
Shard::ForwardProxy(proxy_shard) => proxy_shard.get_telemetry_data(detail).await,
85-
Shard::QueueProxy(proxy_shard) => proxy_shard.get_telemetry_data(detail).await,
92+
Shard::Proxy(proxy_shard) => {
93+
proxy_shard
94+
.get_telemetry_data(detail, timeout, is_stopped_guard)
95+
.await?
96+
}
97+
Shard::ForwardProxy(proxy_shard) => {
98+
proxy_shard
99+
.get_telemetry_data(detail, timeout, is_stopped_guard)
100+
.await?
101+
}
102+
Shard::QueueProxy(proxy_shard) => {
103+
proxy_shard
104+
.get_telemetry_data(detail, timeout, is_stopped_guard)
105+
.await?
106+
}
86107
Shard::Dummy(dummy_shard) => dummy_shard.get_telemetry_data(),
87108
};
88109
telemetry.variant_name = Some(self.variant_name().to_string());
89-
telemetry
110+
Ok(telemetry)
90111
}
91112

92113
pub async fn get_optimization_status(&self) -> OptimizersStatus {

lib/collection/src/shards/telemetry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub struct RemoteShardTelemetry {
3535
pub updates: OperationDurationStatistics,
3636
}
3737

38-
#[derive(Serialize, Clone, Debug, JsonSchema, Anonymize)]
38+
#[derive(Serialize, Clone, Debug, JsonSchema, Anonymize, Default)]
3939
pub struct LocalShardTelemetry {
4040
#[anonymize(false)]
4141
pub variant_name: Option<String>,

0 commit comments

Comments
 (0)