@@ -22,15 +22,15 @@ impl LocalShard {
2222 & self ,
2323 detail : TelemetryDetail ,
2424 timeout : Duration ,
25- is_stopped_guard : & StoppingGuard ,
2625 ) -> CollectionResult < LocalShardTelemetry > {
26+ let start = std:: time:: Instant :: now ( ) ;
2727 let segments = self . segments . clone ( ) ;
28-
2928 let segments_data = if detail. level < DetailsLevel :: Level4 {
3029 Ok ( ( vec ! [ ] , HashMap :: default ( ) ) )
3130 } else {
3231 let locked_collection_config = self . collection_config . clone ( ) ;
33- let is_stopped_guard = is_stopped_guard. clone ( ) ;
32+ let is_stopped_guard = StoppingGuard :: new ( ) ;
33+ let is_stopped = is_stopped_guard. get_is_stopped ( ) ;
3434 let handle = tokio:: task:: spawn_blocking ( move || {
3535 // blocking sync lock
3636 let Some ( segments_guard) = segments. try_read_for ( timeout) else {
@@ -39,10 +39,9 @@ impl LocalShard {
3939 "shard telemetry" ,
4040 ) ) ;
4141 } ;
42-
4342 let mut segments_telemetry = Vec :: with_capacity ( segments_guard. len ( ) ) ;
4443 for ( _id, segment) in segments_guard. iter ( ) {
45- if is_stopped_guard . is_stopped ( ) {
44+ if is_stopped. load ( Ordering :: Relaxed ) {
4645 return Ok ( ( vec ! [ ] , HashMap :: default ( ) ) ) ;
4746 }
4847
@@ -67,7 +66,6 @@ impl LocalShard {
6766 } ;
6867
6968 let ( segments, index_only_excluded_vectors) = segments_data?;
70-
7169 let total_optimized_points = self . total_optimized_points . load ( Ordering :: Relaxed ) ;
7270
7371 let optimizations: OperationDurationStatistics = self
@@ -81,7 +79,9 @@ impl LocalShard {
8179 } )
8280 . fold ( Default :: default ( ) , |total, stats| total + stats) ;
8381
84- let status = self . get_optimization_status ( ) . await ;
82+ // update timeout
83+ let timeout = timeout. saturating_sub ( start. elapsed ( ) ) ;
84+ let status = self . get_optimization_status ( timeout) . await ?;
8585
8686 let SizeStats {
8787 num_vectors,
@@ -117,23 +117,27 @@ impl LocalShard {
117117 } )
118118 }
119119
120- pub async fn get_optimization_status ( & self ) -> OptimizersStatus {
120+ pub async fn get_optimization_status (
121+ & self ,
122+ timeout : Duration ,
123+ ) -> CollectionResult < OptimizersStatus > {
121124 let segments = self . segments . clone ( ) ;
122125
123126 let status = tokio:: task:: spawn_blocking ( move || {
124- let segments = segments. read ( ) ;
127+ // blocking sync lock
128+ let Some ( segments) = segments. try_read_for ( timeout) else {
129+ return Err ( CollectionError :: timeout (
130+ timeout. as_secs ( ) as usize ,
131+ "optimization status" ,
132+ ) ) ;
133+ } ;
125134
126135 match & segments. optimizer_errors {
127- None => OptimizersStatus :: Ok ,
128- Some ( err) => OptimizersStatus :: Error ( err. clone ( ) ) ,
136+ None => Ok ( OptimizersStatus :: Ok ) ,
137+ Some ( err) => Ok ( OptimizersStatus :: Error ( err. clone ( ) ) ) ,
129138 }
130139 } ) ;
131- let status = AbortOnDropHandle :: new ( status) . await ;
132-
133- match status {
134- Ok ( status) => status,
135- Err ( err) => OptimizersStatus :: Error ( format ! ( "failed to get optimizers status: {err}" ) ) ,
136- }
140+ AbortOnDropHandle :: new ( status) . await ?
137141 }
138142
139143 pub async fn get_size_stats ( & self ) -> SizeStats {
0 commit comments