Skip to content

Commit e063af1

Browse files
committed
fix misuses of StoppinGuard
1 parent e28f830 commit e063af1

5 files changed

Lines changed: 14 additions & 18 deletions

File tree

lib/collection/src/shards/local_shard/telemetry.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ impl LocalShard {
3030
} else {
3131
let locked_collection_config = self.collection_config.clone();
3232
let is_stopped_guard = StoppingGuard::new();
33+
let is_stopped = is_stopped_guard.get_is_stopped();
3334
let handle = tokio::task::spawn_blocking(move || {
3435
// blocking sync lock
3536
let Some(segments_guard) = segments.try_read_for(timeout) else {
@@ -40,7 +41,7 @@ impl LocalShard {
4041
};
4142
let mut segments_telemetry = Vec::with_capacity(segments_guard.len());
4243
for (_id, segment) in segments_guard.iter() {
43-
if is_stopped_guard.is_stopped() {
44+
if is_stopped.load(Ordering::Relaxed) {
4445
return Ok((vec![], HashMap::default()));
4546
}
4647

lib/shard/src/common/stopping_guard.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22
use std::sync::atomic::AtomicBool;
33

44
/// Structure that ensures that `is_stopped` flag is set to `true` when dropped.
5-
#[derive(Clone)]
65
pub struct StoppingGuard {
76
is_stopped: Arc<AtomicBool>,
87
}
@@ -18,10 +17,6 @@ impl StoppingGuard {
1817
pub fn get_is_stopped(&self) -> Arc<AtomicBool> {
1918
self.is_stopped.clone()
2019
}
21-
22-
pub fn is_stopped(&self) -> bool {
23-
self.is_stopped.load(std::sync::atomic::Ordering::Relaxed)
24-
}
2520
}
2621

2722
impl Default for StoppingGuard {

lib/storage/src/content_manager/toc/telemetry.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::sync::Arc;
2-
use std::sync::atomic::{AtomicUsize, Ordering};
2+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
33
use std::time::Duration;
44

55
use collection::operations::types::CollectionResult;
@@ -9,7 +9,6 @@ use collection::telemetry::{
99
use common::scope_tracker::{ScopeTracker, ScopeTrackerGuard};
1010
use common::types::TelemetryDetail;
1111
use dashmap::DashMap;
12-
use shard::common::stopping_guard::StoppingGuard;
1312

1413
use crate::content_manager::toc::TableOfContent;
1514
use crate::rbac::Access;
@@ -45,12 +44,12 @@ impl TableOfContent {
4544
detail: TelemetryDetail,
4645
access: &Access,
4746
timeout: Duration,
48-
is_stopped_guard: &StoppingGuard,
47+
is_stopped: &AtomicBool,
4948
) -> CollectionResult<TocTelemetryData> {
5049
let all_collections = self.all_collections_access(access).await;
5150
let mut collection_telemetry = Vec::new();
5251
for collection_pass in &all_collections {
53-
if is_stopped_guard.is_stopped() {
52+
if is_stopped.load(Ordering::Relaxed) {
5453
break;
5554
}
5655
if let Ok(collection) = self.get_collection(collection_pass).await {
@@ -87,12 +86,12 @@ impl TableOfContent {
8786
&self,
8887
access: &Access,
8988
timeout: Duration,
90-
is_stopped_guard: &StoppingGuard,
89+
is_stopped: &AtomicBool,
9190
) -> CollectionResult<Vec<CollectionsAggregatedTelemetry>> {
9291
let mut result = Vec::new();
9392
let all_collections = self.all_collections_access(access).await;
9493
for collection_pass in &all_collections {
95-
if is_stopped_guard.is_stopped() {
94+
if is_stopped.load(Ordering::Relaxed) {
9695
break;
9796
}
9897
if let Ok(collection) = self.get_collection(collection_pass).await {

src/common/telemetry.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,15 @@ impl TelemetryCollector {
8585
) -> StorageResult<TelemetryData> {
8686
let timeout = timeout.unwrap_or(DEFAULT_TELEMETRY_TIMEOUT);
8787
// Use blocking pool because the collection telemetry acquires several sync. locks.
88+
let is_stopped_guard = StoppingGuard::new();
89+
let is_stopped = is_stopped_guard.get_is_stopped();
8890
let collections_telemetry_handle = {
8991
let toc = self
9092
.dispatcher
9193
.toc(access, &new_unchecked_verification_pass())
9294
.clone();
9395
let runtime_handle = toc.general_runtime_handle().clone();
9496
let access_collection = access.clone();
95-
let is_stopped_guard = StoppingGuard::new();
9697

9798
let handle = runtime_handle.spawn_blocking(move || {
9899
// Re-enter the async runtime in this blocking thread
@@ -102,7 +103,7 @@ impl TelemetryCollector {
102103
&access_collection,
103104
&toc,
104105
timeout,
105-
&is_stopped_guard,
106+
&is_stopped,
106107
)
107108
.await
108109
})

src/common/telemetry_ops/collections_telemetry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::atomic::AtomicBool;
12
use std::time::Duration;
23

34
use collection::operations::types::CollectionResult;
@@ -8,7 +9,6 @@ use common::types::{DetailsLevel, TelemetryDetail};
89
use schemars::JsonSchema;
910
use segment::common::anonymize::Anonymize;
1011
use serde::Serialize;
11-
use shard::common::stopping_guard::StoppingGuard;
1212
use storage::content_manager::toc::TableOfContent;
1313
use storage::rbac::Access;
1414

@@ -38,13 +38,13 @@ impl CollectionsTelemetry {
3838
access: &Access,
3939
toc: &TableOfContent,
4040
timeout: Duration,
41-
is_stopped_guard: &StoppingGuard,
41+
is_stopped: &AtomicBool,
4242
) -> CollectionResult<Self> {
4343
let number_of_collections = toc.all_collections(access).await.len();
4444
let (collections, snapshots) = if detail.level >= DetailsLevel::Level1 {
4545
let telemetry_data = if detail.level >= DetailsLevel::Level2 {
4646
let toc_telemetry = toc
47-
.get_telemetry_data(detail, access, timeout, is_stopped_guard)
47+
.get_telemetry_data(detail, access, timeout, is_stopped)
4848
.await?;
4949

5050
let collections: Vec<_> = toc_telemetry
@@ -56,7 +56,7 @@ impl CollectionsTelemetry {
5656
(collections, toc_telemetry.snapshot_telemetry)
5757
} else {
5858
let collections = toc
59-
.get_aggregated_telemetry_data(access, timeout, is_stopped_guard)
59+
.get_aggregated_telemetry_data(access, timeout, is_stopped)
6060
.await?
6161
.into_iter()
6262
.map(CollectionTelemetryEnum::Aggregated)

0 commit comments

Comments
 (0)