Skip to content

Commit 19e5758

Browse files
Make reads sequential in BatchedVectorReader (#6508)
* Add sequential reads and improve BatchedVectorReader * implement get_many_sequential --------- Co-authored-by: generall <[email protected]>
1 parent 43053b0 commit 19e5758

13 files changed

Lines changed: 164 additions & 39 deletions

lib/segment/src/fixtures/index_fixtures.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl<TMetric: Metric<VectorElementType>> VectorStorage for TestRawScorerProducer
6262
self.get_vector_opt(key).expect("vector not found")
6363
}
6464

65+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
66+
self.get_vector(key)
67+
}
68+
6569
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
6670
self.vectors.get_opt(key as _).map(|v| v.into())
6771
}

lib/segment/src/segment_constructor/batched_reader.rs

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ pub struct BatchedVectorReader<'a> {
3030
points_to_insert: &'a [PointData],
3131
source_vector_storages: &'a [AtomicRef<'a, VectorStorageEnum>],
3232
buffer: Vec<(CowVector<'a>, bool)>,
33-
/// Offset in the buffer.
34-
/// From 0 to `BATCH_SIZE`.
35-
buffer_offset: usize,
33+
seg_to_points_buffer: AHashMap<U24, Vec<(&'a PointData, usize)>>,
3634
/// Global position of the iterator.
3735
/// From 0 to `points_to_insert.len()`.
3836
position: usize,
@@ -43,26 +41,18 @@ impl<'a> BatchedVectorReader<'a> {
4341
points_to_insert: &'a [PointData],
4442
source_vector_storages: &'a [AtomicRef<'a, VectorStorageEnum>],
4543
) -> BatchedVectorReader<'a> {
46-
let buffer = (0..BATCH_SIZE)
47-
.map(|_| {
48-
// We need to allocate the buffer with the size of the batch,
49-
// but we don't know the size of the vectors.
50-
// So we use a placeholder vector with size 0.
51-
(CowVector::default(), false)
52-
})
53-
.collect();
54-
55-
let mut res = BatchedVectorReader {
44+
// We need to allocate the buffer with the size of the batch,
45+
// but we don't know the size of the vectors.
46+
// So we use a placeholder vector with size 0.
47+
let buffer = vec![(CowVector::default(), false); BATCH_SIZE];
48+
49+
BatchedVectorReader {
5650
points_to_insert,
5751
source_vector_storages,
5852
buffer,
59-
buffer_offset: 0,
53+
seg_to_points_buffer: AHashMap::default(),
6054
position: 0,
61-
};
62-
63-
res.refill_buffer();
64-
65-
res
55+
}
6656
}
6757

6858
/// Fills the buffer with the next batch of points.
@@ -76,31 +66,36 @@ impl<'a> BatchedVectorReader<'a> {
7666
/// (vec, vector_deleted)
7767
/// ```
7868
fn refill_buffer(&mut self) {
79-
let from = self.position;
80-
let to = min(self.position + BATCH_SIZE, self.points_to_insert.len());
81-
// Read by segments, as we want to localize reads as much as possible.
82-
let mut segment_to_points: AHashMap<U24, Vec<(&PointData, usize)>> = Default::default();
83-
84-
for i in from..to {
85-
let point_data = &self.points_to_insert[i];
86-
let offset_in_batch = i - from;
69+
let start_pos = self.position;
70+
let end_pos = min(self.position + BATCH_SIZE, self.points_to_insert.len());
8771

88-
let segment_index = point_data.segment_index;
89-
let points = segment_to_points.entry(segment_index).or_default();
90-
points.push((point_data, offset_in_batch));
72+
// Read by segments, as we want to localize reads as much as possible.
73+
for pos in start_pos..end_pos {
74+
let point_data = &self.points_to_insert[pos];
75+
let offset_in_batch = pos - start_pos;
76+
77+
self.seg_to_points_buffer
78+
.entry(point_data.segment_index)
79+
.or_default()
80+
.push((point_data, offset_in_batch))
9181
}
9282

93-
for (segment_index, points) in segment_to_points {
83+
for (segment_index, points) in self.seg_to_points_buffer.drain() {
9484
let source_vector_storage = &self.source_vector_storages[segment_index.get() as usize];
95-
// ToDo: Introduce batch operation for reading vectors
9685
for (point_data, offset_in_batch) in points {
97-
let vec = source_vector_storage.get_vector(point_data.internal_id);
86+
let vec = source_vector_storage.get_vector_sequential(point_data.internal_id);
9887
let vector_deleted =
9988
source_vector_storage.is_deleted_vector(point_data.internal_id);
10089
self.buffer[offset_in_batch] = (vec, vector_deleted);
10190
}
10291
}
10392
}
93+
94+
fn refill_buffer_if_needed(&mut self) {
95+
if self.position % BATCH_SIZE == 0 {
96+
self.refill_buffer();
97+
}
98+
}
10499
}
105100

106101
impl<'a> Iterator for BatchedVectorReader<'a> {
@@ -111,13 +106,9 @@ impl<'a> Iterator for BatchedVectorReader<'a> {
111106
return None;
112107
}
113108

114-
if self.buffer_offset == BATCH_SIZE {
115-
self.refill_buffer();
116-
self.buffer_offset = 0;
117-
}
109+
self.refill_buffer_if_needed();
118110

119-
let item = self.buffer[self.buffer_offset].clone();
120-
self.buffer_offset += 1;
111+
let item = self.buffer[self.position % BATCH_SIZE].clone();
121112
self.position += 1;
122113

123114
Some(item)

lib/segment/src/vector_storage/chunked_mmap_vectors.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,10 @@ impl<T: Sized + Copy + 'static> ChunkedVectorStorage<T> for ChunkedMmapVectors<T
357357
ChunkedMmapVectors::get(self, key, false)
358358
}
359359

360+
fn get_sequential(&self, key: VectorOffsetType) -> Option<&[T]> {
361+
ChunkedMmapVectors::get(self, key, true)
362+
}
363+
360364
#[inline]
361365
fn files(&self) -> Vec<PathBuf> {
362366
ChunkedMmapVectors::files(self)
@@ -402,6 +406,11 @@ impl<T: Sized + Copy + 'static> ChunkedVectorStorage<T> for ChunkedMmapVectors<T
402406
ChunkedMmapVectors::get_many(self, key, count, false)
403407
}
404408

409+
#[inline]
410+
fn get_many_sequential(&self, key: VectorOffsetType, count: usize) -> Option<&[T]> {
411+
ChunkedMmapVectors::get_many(self, key, count, true)
412+
}
413+
405414
#[inline]
406415
fn get_batch<'a>(
407416
&'a self,

lib/segment/src/vector_storage/chunked_vector_storage.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ pub trait ChunkedVectorStorage<T> {
1818

1919
fn get(&self, key: VectorOffsetType) -> Option<&[T]>;
2020

21+
fn get_sequential(&self, key: VectorOffsetType) -> Option<&[T]>;
22+
2123
fn files(&self) -> Vec<PathBuf>;
2224

2325
fn flusher(&self) -> Flusher;
@@ -46,6 +48,9 @@ pub trait ChunkedVectorStorage<T> {
4648
/// Returns `count` flattened vectors starting from key. if chunk boundary is crossed, returns None
4749
fn get_many(&self, key: VectorOffsetType, count: usize) -> Option<&[T]>;
4850

51+
/// Returns `count` flattened vectors starting from key. if chunk boundary is crossed, returns None
52+
fn get_many_sequential(&self, key: VectorOffsetType, count: usize) -> Option<&[T]>;
53+
4954
/// Returns batch of vectors by keys.
5055
/// Underlying storage might apply some optimizations to prefetch vectors.
5156
fn get_batch<'a>(

lib/segment/src/vector_storage/dense/appendable_dense_vector_storage.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ impl<T: PrimitiveVectorElement, S: ChunkedVectorStorage<T>> VectorStorage
123123
self.get_vector_opt(key).expect("vector not found")
124124
}
125125

126+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
127+
self.vectors
128+
.get_sequential(key as VectorOffsetType)
129+
.map(|slice| CowVector::from(T::slice_to_float_cow(slice.into())))
130+
.expect("Vector not found")
131+
}
132+
126133
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
127134
self.vectors
128135
.get(key as VectorOffsetType)

lib/segment/src/vector_storage/dense/memmap_dense_vector_storage.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ impl<T: PrimitiveVectorElement> VectorStorage for MemmapDenseVectorStorage<T> {
183183
self.get_vector_opt(key).expect("vector not found")
184184
}
185185

186+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
187+
self.mmap_store
188+
.as_ref()
189+
.unwrap()
190+
.get_vector_opt_sequential(key)
191+
.map(|vector| T::slice_to_float_cow(vector.into()).into())
192+
.expect("Vector not found")
193+
}
194+
186195
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
187196
self.mmap_store
188197
.as_ref()

lib/segment/src/vector_storage/dense/simple_dense_vector_storage.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ impl<T: PrimitiveVectorElement> VectorStorage for SimpleDenseVectorStorage<T> {
220220
self.get_vector_opt(key).expect("vector not found")
221221
}
222222

223+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
224+
// In memory so no optimization to be done here.
225+
self.get_vector(key)
226+
}
227+
223228
/// Get vector by key, if it exists.
224229
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
225230
self.vectors

lib/segment/src/vector_storage/in_ram_persisted_vectors.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ impl<T: Sized + Copy + Clone + Default + 'static> ChunkedVectorStorage<T>
4545
self.mmap_storage.get(key)
4646
}
4747

48+
fn get_sequential(&self, key: VectorOffsetType) -> Option<&[T]> {
49+
self.mmap_storage.get_sequential(key)
50+
}
51+
4852
#[inline]
4953
fn files(&self) -> Vec<PathBuf> {
5054
self.mmap_storage.files()
@@ -91,6 +95,12 @@ impl<T: Sized + Copy + Clone + Default + 'static> ChunkedVectorStorage<T>
9195
self.mmap_storage.get_many(key, count)
9296
}
9397

98+
#[inline]
99+
fn get_many_sequential(&self, key: VectorOffsetType, count: usize) -> Option<&[T]> {
100+
// No optimization for sequential access
101+
self.mmap_storage.get_many(key, count)
102+
}
103+
94104
#[inline]
95105
fn get_batch<'a>(
96106
&'a self,

lib/segment/src/vector_storage/multi_dense/appendable_mmap_multi_dense_vector_storage.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,26 @@ impl<
119119
})
120120
}
121121

122+
/// Returns None if key is not found
123+
fn get_multi_opt_sequential(
124+
&self,
125+
key: PointOffsetType,
126+
) -> Option<TypedMultiDenseVectorRef<T>> {
127+
self.offsets
128+
.get_sequential(key as VectorOffsetType)
129+
.and_then(|mmap_offset| {
130+
let mmap_offset = mmap_offset.first().expect("mmap_offset must not be empty");
131+
self.vectors.get_many_sequential(
132+
mmap_offset.offset as VectorOffsetType,
133+
mmap_offset.count as usize,
134+
)
135+
})
136+
.map(|flattened_vectors| TypedMultiDenseVectorRef {
137+
flattened_vectors,
138+
dim: self.vectors.dim(),
139+
})
140+
}
141+
122142
fn iterate_inner_vectors(&self) -> impl Iterator<Item = &[T]> + Clone + Send {
123143
(0..self.total_vector_count()).flat_map(|key| {
124144
let mmap_offset = self
@@ -176,6 +196,16 @@ impl<
176196
self.get_vector_opt(key).expect("vector not found")
177197
}
178198

199+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
200+
self.get_multi_opt_sequential(key)
201+
.map(|multi_dense_vector| {
202+
CowVector::MultiDense(T::into_float_multivector(CowMultiVector::Borrowed(
203+
multi_dense_vector,
204+
)))
205+
})
206+
.expect("vector not found")
207+
}
208+
179209
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
180210
self.get_multi_opt(key).map(|multi_dense_vector| {
181211
CowVector::MultiDense(T::into_float_multivector(CowMultiVector::Borrowed(

lib/segment/src/vector_storage/multi_dense/simple_multi_dense_vector_storage.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ impl<T: PrimitiveVectorElement> MultiVectorStorage<T> for SimpleMultiDenseVector
310310
})
311311
}
312312

313+
fn get_multi_opt_sequential(
314+
&self,
315+
key: PointOffsetType,
316+
) -> Option<TypedMultiDenseVectorRef<T>> {
317+
// No sequential optimizations available for in memory storage.
318+
self.get_multi_opt(key)
319+
}
320+
313321
fn iterate_inner_vectors(&self) -> impl Iterator<Item = &[T]> + Clone + Send {
314322
(0..self.total_vector_count()).flat_map(|key| {
315323
let metadata = &self.vectors_metadata[key];
@@ -353,6 +361,10 @@ impl<T: PrimitiveVectorElement> VectorStorage for SimpleMultiDenseVectorStorage<
353361
self.get_vector_opt(key).expect("vector not found")
354362
}
355363

364+
fn get_vector_sequential(&self, key: PointOffsetType) -> CowVector {
365+
self.get_vector(key)
366+
}
367+
356368
fn get_vector_opt(&self, key: PointOffsetType) -> Option<CowVector> {
357369
self.get_multi_opt(key).map(|multi_dense_vector| {
358370
CowVector::MultiDense(T::into_float_multivector(CowMultiVector::Borrowed(

0 commit comments

Comments
 (0)