@@ -30,9 +30,7 @@ pub struct BatchedVectorReader<'a> {
3030 points_to_insert : & ' a [ PointData ] ,
3131 source_vector_storages : & ' a [ AtomicRef < ' a , VectorStorageEnum > ] ,
3232 buffer : Vec < ( CowVector < ' a > , bool ) > ,
33- /// Offset in the buffer.
34- /// From 0 to `BATCH_SIZE`.
35- buffer_offset : usize ,
33+ seg_to_points_buffer : AHashMap < U24 , Vec < ( & ' a PointData , usize ) > > ,
3634 /// Global position of the iterator.
3735 /// From 0 to `points_to_insert.len()`.
3836 position : usize ,
@@ -43,26 +41,18 @@ impl<'a> BatchedVectorReader<'a> {
4341 points_to_insert : & ' a [ PointData ] ,
4442 source_vector_storages : & ' a [ AtomicRef < ' a , VectorStorageEnum > ] ,
4543 ) -> BatchedVectorReader < ' a > {
46- let buffer = ( 0 ..BATCH_SIZE )
47- . map ( |_| {
48- // We need to allocate the buffer with the size of the batch,
49- // but we don't know the size of the vectors.
50- // So we use a placeholder vector with size 0.
51- ( CowVector :: default ( ) , false )
52- } )
53- . collect ( ) ;
54-
55- let mut res = BatchedVectorReader {
44+ // We need to allocate the buffer with the size of the batch,
45+ // but we don't know the size of the vectors.
46+ // So we use a placeholder vector with size 0.
47+ let buffer = vec ! [ ( CowVector :: default ( ) , false ) ; BATCH_SIZE ] ;
48+
49+ BatchedVectorReader {
5650 points_to_insert,
5751 source_vector_storages,
5852 buffer,
59- buffer_offset : 0 ,
53+ seg_to_points_buffer : AHashMap :: default ( ) ,
6054 position : 0 ,
61- } ;
62-
63- res. refill_buffer ( ) ;
64-
65- res
55+ }
6656 }
6757
6858 /// Fills the buffer with the next batch of points.
@@ -76,31 +66,36 @@ impl<'a> BatchedVectorReader<'a> {
7666 /// (vec, vector_deleted)
7767 /// ```
7868 fn refill_buffer ( & mut self ) {
79- let from = self . position ;
80- let to = min ( self . position + BATCH_SIZE , self . points_to_insert . len ( ) ) ;
81- // Read by segments, as we want to localize reads as much as possible.
82- let mut segment_to_points: AHashMap < U24 , Vec < ( & PointData , usize ) > > = Default :: default ( ) ;
83-
84- for i in from..to {
85- let point_data = & self . points_to_insert [ i] ;
86- let offset_in_batch = i - from;
69+ let start_pos = self . position ;
70+ let end_pos = min ( self . position + BATCH_SIZE , self . points_to_insert . len ( ) ) ;
8771
88- let segment_index = point_data. segment_index ;
89- let points = segment_to_points. entry ( segment_index) . or_default ( ) ;
90- points. push ( ( point_data, offset_in_batch) ) ;
72+ // Read by segments, as we want to localize reads as much as possible.
73+ for pos in start_pos..end_pos {
74+ let point_data = & self . points_to_insert [ pos] ;
75+ let offset_in_batch = pos - start_pos;
76+
77+ self . seg_to_points_buffer
78+ . entry ( point_data. segment_index )
79+ . or_default ( )
80+ . push ( ( point_data, offset_in_batch) )
9181 }
9282
93- for ( segment_index, points) in segment_to_points {
83+ for ( segment_index, points) in self . seg_to_points_buffer . drain ( ) {
9484 let source_vector_storage = & self . source_vector_storages [ segment_index. get ( ) as usize ] ;
95- // ToDo: Introduce batch operation for reading vectors
9685 for ( point_data, offset_in_batch) in points {
97- let vec = source_vector_storage. get_vector ( point_data. internal_id ) ;
86+ let vec = source_vector_storage. get_vector_sequential ( point_data. internal_id ) ;
9887 let vector_deleted =
9988 source_vector_storage. is_deleted_vector ( point_data. internal_id ) ;
10089 self . buffer [ offset_in_batch] = ( vec, vector_deleted) ;
10190 }
10291 }
10392 }
93+
94+ fn refill_buffer_if_needed ( & mut self ) {
95+ if self . position % BATCH_SIZE == 0 {
96+ self . refill_buffer ( ) ;
97+ }
98+ }
10499}
105100
106101impl < ' a > Iterator for BatchedVectorReader < ' a > {
@@ -111,13 +106,9 @@ impl<'a> Iterator for BatchedVectorReader<'a> {
111106 return None ;
112107 }
113108
114- if self . buffer_offset == BATCH_SIZE {
115- self . refill_buffer ( ) ;
116- self . buffer_offset = 0 ;
117- }
109+ self . refill_buffer_if_needed ( ) ;
118110
119- let item = self . buffer [ self . buffer_offset ] . clone ( ) ;
120- self . buffer_offset += 1 ;
111+ let item = self . buffer [ self . position % BATCH_SIZE ] . clone ( ) ;
121112 self . position += 1 ;
122113
123114 Some ( item)
0 commit comments