Skip to content

Commit 7ec3c38

Browse files
committed
review fixes: rename all scroll_lock, use write lock for unproxy instead of read
1 parent f2ee497 commit 7ec3c38

4 files changed

Lines changed: 16 additions & 16 deletions

File tree

lib/collection/src/collection_manager/collection_updater.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl CollectionUpdater {
4343
segments: &RwLock<SegmentHolder>,
4444
op_num: SeqNumberType,
4545
operation: CollectionUpdateOperations,
46-
scroll_lock: Arc<tokio::sync::RwLock<()>>,
46+
update_operation_lock: Arc<tokio::sync::RwLock<()>>,
4747
update_tracker: UpdateTracker,
4848
hw_counter: &HardwareCounterCell,
4949
) -> CollectionResult<usize> {
@@ -52,7 +52,7 @@ impl CollectionUpdater {
5252
// Allow only one update at a time, ensure no data races between segments.
5353
// let _update_lock = self.update_lock.lock().unwrap();
5454

55-
let _scroll_lock = scroll_lock.blocking_write();
55+
let _update_operation_lock = update_operation_lock.blocking_write();
5656
let _update_guard = update_tracker.update();
5757

5858
match operation {

lib/collection/src/shards/local_shard/scroll.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl LocalShard {
153153
let stopping_guard = StoppingGuard::new();
154154
let segments = self.segments.clone();
155155

156-
let scroll_lock = self.update_operation_lock.read().await;
156+
let update_operation_lock = self.update_operation_lock.read().await;
157157
let (non_appendable, appendable) = segments.read().split_segments();
158158

159159
let read_filtered = |segment: LockedSegment, hw_counter: HardwareCounterCell| {
@@ -210,7 +210,7 @@ impl LocalShard {
210210
.await
211211
.map_err(|_: Elapsed| CollectionError::timeout(timeout.as_secs() as usize, "retrieve"))??;
212212

213-
drop(scroll_lock);
213+
drop(update_operation_lock);
214214

215215
let ordered_records = point_ids
216216
.iter()
@@ -238,7 +238,7 @@ impl LocalShard {
238238
let stopping_guard = StoppingGuard::new();
239239
let segments = self.segments.clone();
240240

241-
let scroll_lock = self.update_operation_lock.read().await;
241+
let update_operation_lock = self.update_operation_lock.read().await;
242242
let (non_appendable, appendable) = segments.read().split_segments();
243243

244244
let read_ordered_filtered = |segment: LockedSegment, hw_counter: &HardwareCounterCell| {
@@ -306,7 +306,7 @@ impl LocalShard {
306306
.await
307307
.map_err(|_| CollectionError::timeout(timeout.as_secs() as usize, "retrieve"))??;
308308

309-
drop(scroll_lock);
309+
drop(update_operation_lock);
310310

311311
let ordered_records = point_ids
312312
.iter()
@@ -337,7 +337,7 @@ impl LocalShard {
337337
let stopping_guard = StoppingGuard::new();
338338
let segments = self.segments.clone();
339339

340-
let scroll_lock = self.update_operation_lock.read().await;
340+
let update_operation_lock = self.update_operation_lock.read().await;
341341
let (non_appendable, appendable) = segments.read().split_segments();
342342

343343
let read_filtered = |segment: LockedSegment, hw_counter: &HardwareCounterCell| {
@@ -446,7 +446,7 @@ impl LocalShard {
446446
.await
447447
.map_err(|_: Elapsed| CollectionError::timeout(timeout.as_secs() as usize, "retrieve"))??;
448448

449-
drop(scroll_lock);
449+
drop(update_operation_lock);
450450

451451
Ok(records_map.into_values().collect())
452452
}

lib/collection/src/shards/local_shard/snapshot.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ where
326326
// by `Self::unproxy_all_segments` afterwards to maintain the read consistency.
327327
let remaining = proxies.len() - unproxied_segment_ids.len();
328328
if remaining > 1 {
329-
let _update_guard = update_lock.blocking_read();
329+
let _update_guard = update_lock.blocking_write();
330330
match SegmentHolder::try_unproxy_segment(
331331
segments_lock,
332332
*segment_id,
@@ -345,7 +345,7 @@ where
345345
// Unproxy all segments
346346
// Always do this to prevent leaving proxy segments behind
347347
log::trace!("Unproxying all shard segments after function is applied");
348-
let _update_guard = update_lock.blocking_read();
348+
let _update_guard = update_lock.blocking_write();
349349
SegmentHolder::unproxy_all_segments(segments_lock, proxies, tmp_segment)?;
350350

351351
result

lib/collection/src/update_handler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ impl UpdateHandler {
294294
async fn try_recover(
295295
segments: LockedSegmentHolder,
296296
wal: LockedWal,
297-
scroll_lock: Arc<tokio::sync::RwLock<()>>,
297+
update_operation_lock: Arc<tokio::sync::RwLock<()>>,
298298
update_tracker: UpdateTracker,
299299
) -> CollectionResult<usize> {
300300
// Try to re-apply everything starting from the first failed operation
@@ -308,7 +308,7 @@ impl UpdateHandler {
308308
&segments,
309309
op_num,
310310
operation.operation,
311-
scroll_lock.clone(),
311+
update_operation_lock.clone(),
312312
update_tracker.clone(),
313313
&HardwareCounterCell::disposable(), // Internal operation, no measurement needed
314314
)?;
@@ -630,7 +630,7 @@ impl UpdateHandler {
630630
max_handles: Option<usize>,
631631
has_triggered_optimizers: Arc<AtomicBool>,
632632
payload_index_schema: Arc<SaveOnDisk<PayloadIndexSchema>>,
633-
scroll_lock: Arc<tokio::sync::RwLock<()>>,
633+
update_operation_lock: Arc<tokio::sync::RwLock<()>>,
634634
update_tracker: UpdateTracker,
635635
) {
636636
let max_handles = max_handles.unwrap_or(usize::MAX);
@@ -706,7 +706,7 @@ impl UpdateHandler {
706706
if Self::try_recover(
707707
segments.clone(),
708708
wal.clone(),
709-
scroll_lock.clone(),
709+
update_operation_lock.clone(),
710710
update_tracker.clone(),
711711
)
712712
.await
@@ -766,7 +766,7 @@ impl UpdateHandler {
766766
optimize_sender: Sender<OptimizerSignal>,
767767
wal: LockedWal,
768768
segments: LockedSegmentHolder,
769-
scroll_lock: Arc<tokio::sync::RwLock<()>>,
769+
update_operation_lock: Arc<tokio::sync::RwLock<()>>,
770770
update_tracker: UpdateTracker,
771771
) {
772772
while let Some(signal) = receiver.blocking_recv() {
@@ -799,7 +799,7 @@ impl UpdateHandler {
799799
&segments,
800800
op_num,
801801
operation,
802-
scroll_lock.clone(),
802+
update_operation_lock.clone(),
803803
update_tracker.clone(),
804804
&hw_measurements.get_counter_cell(),
805805
);

0 commit comments

Comments
 (0)