11use std:: collections:: HashMap ;
22use std:: sync:: atomic:: Ordering ;
3+ use std:: time:: Duration ;
34
45use common:: types:: { DetailsLevel , TelemetryDetail } ;
56use segment:: common:: BYTES_IN_KB ;
67use segment:: common:: operation_time_statistics:: OperationDurationStatistics ;
78use segment:: types:: { SizeStats , VectorNameBuf } ;
89use segment:: vector_storage:: common:: get_async_scorer;
10+ use shard:: common:: stopping_guard:: StoppingGuard ;
911use shard:: segment_holder:: SegmentHolder ;
1012use tokio_util:: task:: AbortOnDropHandle ;
1113
1214use crate :: config:: CollectionConfigInternal ;
13- use crate :: operations:: types:: OptimizersStatus ;
15+ use crate :: operations:: types:: { CollectionError , CollectionResult , OptimizersStatus } ;
1416use crate :: optimizers_builder:: DEFAULT_INDEXING_THRESHOLD_KB ;
1517use crate :: shards:: local_shard:: LocalShard ;
1618use crate :: shards:: telemetry:: { LocalShardTelemetry , OptimizerTelemetry } ;
1719
1820impl LocalShard {
19- pub async fn get_telemetry_data ( & self , detail : TelemetryDetail ) -> LocalShardTelemetry {
21+ pub async fn get_telemetry_data (
22+ & self ,
23+ detail : TelemetryDetail ,
24+ timeout : Duration ,
25+ is_stopped_guard : & StoppingGuard ,
26+ ) -> CollectionResult < LocalShardTelemetry > {
2027 let segments = self . segments . clone ( ) ;
2128
2229 let segments_data = if detail. level < DetailsLevel :: Level4 {
2330 Ok ( ( vec ! [ ] , HashMap :: default ( ) ) )
2431 } else {
2532 let locked_collection_config = self . collection_config . clone ( ) ;
26-
33+ let is_stopped_guard = is_stopped_guard . clone ( ) ;
2734 let handle = tokio:: task:: spawn_blocking ( move || {
2835 // blocking sync lock
29- let segments_guard = segments. read ( ) ;
36+ let Some ( segments_guard) = segments. try_read_for ( timeout) else {
37+ return Err ( CollectionError :: timeout (
38+ timeout. as_secs ( ) as usize ,
39+ "shard telemetry" ,
40+ ) ) ;
41+ } ;
42+
43+ let mut segments_telemetry = Vec :: with_capacity ( segments_guard. len ( ) ) ;
44+ for ( _id, segment) in segments_guard. iter ( ) {
45+ if is_stopped_guard. is_stopped ( ) {
46+ return Ok ( ( vec ! [ ] , HashMap :: default ( ) ) ) ;
47+ }
3048
31- let segments_telemetry = segments_guard
32- . iter ( )
33- . map ( |( _id, segment) | segment. get ( ) . read ( ) . get_telemetry_data ( detail) )
34- . collect ( ) ;
49+ // blocking sync lock
50+ let Some ( segment_guard) = segment. get ( ) . try_read_for ( timeout) else {
51+ return Err ( CollectionError :: timeout (
52+ timeout. as_secs ( ) as usize ,
53+ "shard telemetry" ,
54+ ) ) ;
55+ } ;
56+
57+ segments_telemetry. push ( segment_guard. get_telemetry_data ( detail) )
58+ }
3559
3660 let collection_config = locked_collection_config. blocking_read ( ) ;
3761 let indexed_only_excluded_vectors =
3862 get_index_only_excluded_vectors ( & segments_guard, & collection_config) ;
3963
40- ( segments_telemetry, indexed_only_excluded_vectors)
64+ Ok ( ( segments_telemetry, indexed_only_excluded_vectors) )
4165 } ) ;
42- AbortOnDropHandle :: new ( handle) . await
66+ AbortOnDropHandle :: new ( handle) . await ?
4367 } ;
4468
45- if let Err ( err) = & segments_data {
46- log:: error!( "Failed to get telemetry: {err}" ) ;
47- }
48-
49- let ( segments, index_only_excluded_vectors) = segments_data. unwrap_or_default ( ) ;
69+ let ( segments, index_only_excluded_vectors) = segments_data?;
5070
5171 let total_optimized_points = self . total_optimized_points . load ( Ordering :: Relaxed ) ;
5272
@@ -71,7 +91,7 @@ impl LocalShard {
7191 num_points,
7292 } = self . get_size_stats ( ) . await ;
7393
74- LocalShardTelemetry {
94+ Ok ( LocalShardTelemetry {
7595 variant_name : None ,
7696 status : None ,
7797 total_optimized_points,
@@ -94,7 +114,7 @@ impl LocalShard {
94114 async_scorer : Some ( get_async_scorer ( ) ) ,
95115 indexed_only_excluded_vectors : ( !index_only_excluded_vectors. is_empty ( ) )
96116 . then_some ( index_only_excluded_vectors) ,
97- }
117+ } )
98118 }
99119
100120 pub async fn get_optimization_status ( & self ) -> OptimizersStatus {
0 commit comments