Skip to content

Commit 6fe46d0

Browse files
refactor update workers (#7642)
* refactor: move flush workers into a dedicated file fmt * move optimizer worker into a dedicated file * move update worker into a dedicated file * Update lib/collection/src/update_workers/optimization_worker.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update lib/collection/src/update_workers/optimization_worker.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 20aca3f commit 6fe46d0

8 files changed

Lines changed: 851 additions & 780 deletions

File tree

lib/collection/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ pub mod events;
2121
mod tests;
2222

2323
pub mod profiling;
24+
pub mod update_workers;

lib/collection/src/shards/local_shard/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ const OLDEST_CLOCKS_PATH: &str = "oldest_clocks.json";
9696
/// LocalShard is an entity that can be moved between peers and contains some part of one collections data.
9797
///
9898
/// Holds all object, required for collection functioning
99+
#[must_use = "Local Shard must be explicitly handled"]
99100
pub struct LocalShard {
100101
collection_name: CollectionId,
101102
pub(super) segments: LockedSegmentHolder,

lib/collection/src/tests/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ use crate::collection_manager::optimizers::segment_optimizer::OptimizerThreshold
3939
use crate::config::CollectionParams;
4040
use crate::operations::types::VectorsConfig;
4141
use crate::operations::vector_params_builder::VectorParamsBuilder;
42-
use crate::update_handler::{Optimizer, UpdateHandler};
42+
use crate::update_handler::Optimizer;
43+
use crate::update_workers::UpdateWorkers;
4344

4445
#[tokio::test]
4546
async fn test_optimization_process() {
@@ -72,7 +73,7 @@ async fn test_optimization_process() {
7273
let optimizers_log = Arc::new(Mutex::new(Default::default()));
7374
let total_optimized_points = Arc::new(AtomicUsize::new(0));
7475
let segments: Arc<RwLock<_>> = Arc::new(RwLock::new(holder));
75-
let handles = UpdateHandler::launch_optimization(
76+
let handles = UpdateWorkers::launch_optimization(
7677
optimizers.clone(),
7778
optimizers_log.clone(),
7879
total_optimized_points.clone(),
@@ -115,7 +116,7 @@ async fn test_optimization_process() {
115116
assert_eq!(res.unwrap(), Some(true));
116117
}
117118

118-
let handles = UpdateHandler::launch_optimization(
119+
let handles = UpdateWorkers::launch_optimization(
119120
optimizers.clone(),
120121
optimizers_log.clone(),
121122
total_optimized_points.clone(),
@@ -172,7 +173,7 @@ async fn test_cancel_optimization() {
172173
let optimizers_log = Arc::new(Mutex::new(Default::default()));
173174
let total_optimized_points = Arc::new(AtomicUsize::new(0));
174175
let segments: Arc<RwLock<_>> = Arc::new(RwLock::new(holder));
175-
let handles = UpdateHandler::launch_optimization(
176+
let handles = UpdateWorkers::launch_optimization(
176177
optimizers.clone(),
177178
optimizers_log.clone(),
178179
total_optimized_points.clone(),
@@ -259,7 +260,7 @@ async fn test_new_segment_when_all_over_capacity() {
259260
assert_eq!(segments.read().len(), 5);
260261

261262
// On optimization we expect one new segment to be created, all are over capacity
262-
UpdateHandler::ensure_appendable_segment_with_capacity(
263+
UpdateWorkers::ensure_appendable_segment_with_capacity(
263264
&segments,
264265
dir.path(),
265266
&collection_params,
@@ -271,7 +272,7 @@ async fn test_new_segment_when_all_over_capacity() {
271272
assert_eq!(segments.read().len(), 6);
272273

273274
// On reoptimization we don't expect another segment, we have one segment with capacity
274-
UpdateHandler::ensure_appendable_segment_with_capacity(
275+
UpdateWorkers::ensure_appendable_segment_with_capacity(
275276
&segments,
276277
dir.path(),
277278
&collection_params,
@@ -317,7 +318,7 @@ async fn test_new_segment_when_all_over_capacity() {
317318
}
318319

319320
// On reoptimization we expect one more segment to be created, all are over capacity
320-
UpdateHandler::ensure_appendable_segment_with_capacity(
321+
UpdateWorkers::ensure_appendable_segment_with_capacity(
321322
&segments,
322323
dir.path(),
323324
&collection_params,

0 commit comments

Comments
 (0)