Skip to content

Commit f2ee497

Browse files
committed
use former scroll update lock to prevent snapshot operations overlapping with updates
1 parent 1908d19 commit f2ee497

6 files changed

Lines changed: 34 additions & 20 deletions

File tree

lib/collection/src/shards/local_shard/mod.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,22 @@ pub struct LocalShard {
105105
disk_usage_watcher: DiskUsageWatcher,
106106
read_rate_limiter: Option<ParkingMutex<RateLimiter>>,
107107

108-
/// Scroll read lock
109-
/// The lock, which must prevent updates during scroll + retrieve operations
110-
/// Consistency of scroll operations is especially important for internal processes like
111-
/// re-sharding and shard transfer, so explicit lock for those operations is required.
108+
/// Update operation lock
109+
/// The lock, which must prevent updates critical sections of other operations, which
110+
/// are not compatible with updates.
112111
///
113-
/// Write lock must be held for updates, while read lock must be held for scroll
114-
pub(super) scroll_read_lock: Arc<tokio::sync::RwLock<()>>,
112+
/// Currently used for:
113+
///
114+
/// * Blocking updates during scroll + retrieve operations
115+
/// Consistency of scroll operations is especially important for internal processes like
116+
/// re-sharding and shard transfer, so explicit lock for those operations is required.
117+
///
118+
/// * Blocking updates during some parts of snapshot creation
119+
/// Snapshotting process wraps and unwraps proxy segments, which might
120+
/// create inconsistencies if updates are applied concurrently.
121+
///
122+
/// Write lock must be held for updates, while read lock must be held for critical sections
123+
pub(super) update_operation_lock: Arc<tokio::sync::RwLock<()>>,
115124
}
116125

117126
/// Shard holds information about segments and WAL.
@@ -249,7 +258,7 @@ impl LocalShard {
249258
total_optimized_points,
250259
disk_usage_watcher,
251260
read_rate_limiter,
252-
scroll_read_lock,
261+
update_operation_lock: scroll_read_lock,
253262
}
254263
}
255264

@@ -660,7 +669,7 @@ impl LocalShard {
660669
segments,
661670
op_num,
662671
update.operation,
663-
self.scroll_read_lock.clone(),
672+
self.update_operation_lock.clone(),
664673
self.update_tracker.clone(),
665674
&HardwareCounterCell::disposable(), // Internal operation, no measurement needed.
666675
) {

lib/collection/src/shards/local_shard/scroll.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl LocalShard {
153153
let stopping_guard = StoppingGuard::new();
154154
let segments = self.segments.clone();
155155

156-
let scroll_lock = self.scroll_read_lock.read().await;
156+
let scroll_lock = self.update_operation_lock.read().await;
157157
let (non_appendable, appendable) = segments.read().split_segments();
158158

159159
let read_filtered = |segment: LockedSegment, hw_counter: HardwareCounterCell| {
@@ -238,7 +238,7 @@ impl LocalShard {
238238
let stopping_guard = StoppingGuard::new();
239239
let segments = self.segments.clone();
240240

241-
let scroll_lock = self.scroll_read_lock.read().await;
241+
let scroll_lock = self.update_operation_lock.read().await;
242242
let (non_appendable, appendable) = segments.read().split_segments();
243243

244244
let read_ordered_filtered = |segment: LockedSegment, hw_counter: &HardwareCounterCell| {
@@ -337,7 +337,7 @@ impl LocalShard {
337337
let stopping_guard = StoppingGuard::new();
338338
let segments = self.segments.clone();
339339

340-
let scroll_lock = self.scroll_read_lock.read().await;
340+
let scroll_lock = self.update_operation_lock.read().await;
341341
let (non_appendable, appendable) = segments.read().split_segments();
342342

343343
let read_filtered = |segment: LockedSegment, hw_counter: &HardwareCounterCell| {

lib/collection/src/shards/local_shard/snapshot.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ impl LocalShard {
9090
let temp_path = temp_path.to_owned();
9191

9292
let tar_c = tar.clone();
93+
let update_lock = self.update_operation_lock.clone();
94+
9395
tokio::task::spawn_blocking(move || {
9496
// Do not change segments while snapshotting
9597
snapshot_all_segments(
@@ -101,6 +103,7 @@ impl LocalShard {
101103
&tar_c.descend(Path::new(SEGMENTS_PATH))?,
102104
format,
103105
manifest.as_ref(),
106+
update_lock,
104107
)?;
105108

106109
if save_wal {
@@ -210,6 +213,9 @@ pub fn snapshot_all_segments(
210213
tar: &tar_ext::BuilderExt,
211214
format: SnapshotFormat,
212215
manifest: Option<&SnapshotManifest>,
216+
// Update lock prevents segment operations during update.
217+
// For instance, we can't unproxy segments while update operation is in progress.
218+
update_lock: Arc<tokio::sync::RwLock<()>>,
213219
) -> OperationResult<()> {
214220
// Snapshotting may take long-running read locks on segments blocking incoming writes, do
215221
// this through proxied segments to allow writes to continue.
@@ -232,6 +238,7 @@ pub fn snapshot_all_segments(
232238
)?;
233239
Ok(())
234240
},
241+
update_lock,
235242
)
236243
}
237244

@@ -264,6 +271,7 @@ pub fn proxy_all_segments_and_apply<F>(
264271
segment_config: Option<SegmentConfig>,
265272
payload_index_schema: Arc<SaveOnDisk<PayloadIndexSchema>>,
266273
mut operation: F,
274+
update_lock: Arc<tokio::sync::RwLock<()>>,
267275
) -> OperationResult<()>
268276
where
269277
F: FnMut(&RwLock<dyn SegmentEntry>) -> OperationResult<()>,
@@ -318,6 +326,7 @@ where
318326
// by `Self::unproxy_all_segments` afterwards to maintain the read consistency.
319327
let remaining = proxies.len() - unproxied_segment_ids.len();
320328
if remaining > 1 {
329+
let _update_guard = update_lock.blocking_read();
321330
match SegmentHolder::try_unproxy_segment(
322331
segments_lock,
323332
*segment_id,
@@ -336,6 +345,7 @@ where
336345
// Unproxy all segments
337346
// Always do this to prevent leaving proxy segments behind
338347
log::trace!("Unproxying all shard segments after function is applied");
348+
let _update_guard = update_lock.blocking_read();
339349
SegmentHolder::unproxy_all_segments(segments_lock, proxies, tmp_segment)?;
340350

341351
result

lib/collection/src/shards/local_shard/snapshot_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ fn test_snapshot_all() {
4242
let schema: Arc<SaveOnDisk<PayloadIndexSchema>> =
4343
Arc::new(SaveOnDisk::load_or_init_default(payload_schema_file).unwrap());
4444

45+
let update_lock = Arc::new(tokio::sync::RwLock::new(()));
46+
4547
snapshot_all_segments(
4648
holder.clone(),
4749
segments_dir.path(),
@@ -51,6 +53,7 @@ fn test_snapshot_all() {
5153
&tar,
5254
SnapshotFormat::Regular,
5355
None,
56+
update_lock,
5457
)
5558
.unwrap();
5659

lib/shard/src/segment_holder/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use ahash::{AHashMap, AHashSet};
1515
use common::counter::hardware_counter::HardwareCounterCell;
1616
use common::iterator_ext::IteratorExt;
1717
use common::save_on_disk::SaveOnDisk;
18-
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
18+
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
1919
use rand::seq::IndexedRandom;
2020
use segment::common::operation_error::{OperationError, OperationResult};
2121
use segment::data_types::named_vectors::NamedVectors;
@@ -70,11 +70,6 @@ pub struct SegmentHolder {
7070
/// An example for this are operations that don't modify any points but could be expensive to recover from during WAL recovery.
7171
/// To acknowledge them in WAL, we overwrite the max_persisted value in `Self::flush_all` with the segment version stored here.
7272
max_persisted_segment_version_overwrite: AtomicU64,
73-
74-
/// Ensuring at most one update at time is happening.
75-
/// Clients updates are serialized through the channel update but some internals are performing
76-
/// out of band updates.
77-
update_lock: Arc<Mutex<()>>,
7873
}
7974

8075
pub type LockedSegmentHolder = Arc<RwLock<SegmentHolder>>;
@@ -540,7 +535,6 @@ impl SegmentHolder {
540535
&T,
541536
) -> OperationResult<bool>,
542537
{
543-
let _update_guard = self.update_lock.lock();
544538
let (to_update, to_delete) = self.find_points_to_update_and_delete(ids);
545539

546540
// Delete old points first, because we want to handle copy-on-write in multiple proxy segments properly

lib/shard/src/segment_holder/snapshot.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ impl SegmentHolder {
146146

147147
// Batch 1: propagate changes to wrapped segment with segment holder read lock
148148
{
149-
let _update_guard = segments_lock.update_lock.lock();
150149
if let Err(err) = proxy_segment.read().propagate_to_wrapped() {
151150
log::error!(
152151
"Propagating proxy segment {segment_id} changes to wrapped segment failed, ignoring: {err}",
@@ -196,7 +195,6 @@ impl SegmentHolder {
196195
LockedSegment::Proxy(proxy_segment) => Some((segment_id, proxy_segment)),
197196
LockedSegment::Original(_) => None,
198197
}).for_each(|(proxy_id, proxy_segment)| {
199-
let _update_guard = segments_lock.update_lock.lock();
200198
if let Err(err) = proxy_segment.read().propagate_to_wrapped() {
201199
log::error!("Propagating proxy segment {proxy_id} changes to wrapped segment failed, ignoring: {err}");
202200
}

0 commit comments

Comments
 (0)