@@ -13,11 +13,6 @@ use crate::shards::shard::ShardId;
1313#[ serde( from = "SerdeHelper" , into = "SerdeHelper" ) ]
1414pub struct ShardKeyMapping {
1515 shard_key_to_shard_ids : HashMap < ShardKey , HashSet < ShardId > > ,
16-
17- /// `true` if the ShardKeyMapping was specified in the old format.
18- /// TODO(shardkey): Remove once all keys are migrated.
19- #[ serde( skip) ]
20- pub ( crate ) was_old_format : bool ,
2116}
2217
2318impl ops:: Deref for ShardKeyMapping {
@@ -79,41 +74,17 @@ impl ShardKeyMapping {
7974
8075impl From < SerdeHelper > for ShardKeyMapping {
8176 fn from ( helper : SerdeHelper ) -> Self {
82- let mut was_old_format = false ;
83-
84- let shard_key_to_shard_ids = match helper {
85- SerdeHelper :: New ( key_ids_pairs) => key_ids_pairs
86- . into_iter ( )
87- . map ( KeyIdsPair :: into_parts)
88- . collect ( ) ,
89-
90- SerdeHelper :: Old ( key_ids_map) => {
91- was_old_format = true ;
92- key_ids_map
93- }
94- } ;
95-
9677 Self {
97- shard_key_to_shard_ids,
98- was_old_format,
78+ shard_key_to_shard_ids : helper. 0 . into_iter ( ) . map ( KeyIdsPair :: into_parts) . collect ( ) ,
9979 }
10080 }
10181}
10282
103- /// Helper structure for persisting shard key mapping
104- ///
105- /// The original format of persisting shard key mappings as hash map is broken. It forgets type
106- /// information for the shard key, which resulted in shard key numbers to be converted into
107- /// strings.
83+ /// Helper structure for persisting shard key mapping in safe format
10884///
10985/// Bug: <https://github.com/qdrant/qdrant/pull/5838>
11086#[ derive( Deserialize , Serialize ) ]
111- #[ serde( untagged) ]
112- enum SerdeHelper {
113- New ( Vec < KeyIdsPair > ) ,
114- // TODO(1.15): remove this old format, deployment should exclusively be using new format
115- Old ( HashMap < ShardKey , HashSet < ShardId > > ) ,
116- }
87+ struct SerdeHelper ( Vec < KeyIdsPair > ) ;
11788
11889impl From < ShardKeyMapping > for SerdeHelper {
11990 fn from ( mapping : ShardKeyMapping ) -> Self {
@@ -122,7 +93,7 @@ impl From<ShardKeyMapping> for SerdeHelper {
12293 . into_iter ( )
12394 . map ( KeyIdsPair :: from)
12495 . collect ( ) ;
125- Self :: New ( key_ids_pairs)
96+ Self ( key_ids_pairs)
12697 }
12798}
12899
@@ -149,162 +120,3 @@ impl From<(ShardKey, HashSet<ShardId>)> for KeyIdsPair {
149120 Self { key, shard_ids }
150121 }
151122}
152-
153- #[ cfg( test) ]
154- mod test {
155-
156- use std:: sync:: Arc ;
157-
158- use common:: budget:: ResourceBudget ;
159- use common:: counter:: hardware_accumulator:: HwMeasurementAcc ;
160- use fs_err:: File ;
161- use segment:: types:: { PayloadFieldSchema , PayloadSchemaType } ;
162- use tempfile:: { Builder , TempDir } ;
163-
164- use super :: * ;
165- use crate :: collection:: { Collection , RequestShardTransfer } ;
166- use crate :: config:: { CollectionConfigInternal , CollectionParams , ShardingMethod , WalConfig } ;
167- use crate :: operations:: shared_storage_config:: SharedStorageConfig ;
168- use crate :: optimizers_builder:: OptimizersConfig ;
169- use crate :: shards:: channel_service:: ChannelService ;
170- use crate :: shards:: collection_shard_distribution:: CollectionShardDistribution ;
171- use crate :: shards:: replica_set:: { AbortShardTransfer , ChangePeerFromState , ReplicaState } ;
172- use crate :: shards:: shard_holder:: SHARD_KEY_MAPPING_FILE ;
173-
174- const COLLECTION_TEST_NAME : & str = "shard_key_test" ;
175-
176- async fn make_collection ( collection_name : & str , collection_dir : & TempDir ) -> Collection {
177- let wal_config = WalConfig :: default ( ) ;
178- let mut collection_params = CollectionParams :: empty ( ) ;
179- collection_params. sharding_method = Some ( ShardingMethod :: Custom ) ;
180-
181- let config = CollectionConfigInternal {
182- params : collection_params,
183- optimizer_config : OptimizersConfig :: fixture ( ) ,
184- wal_config,
185- hnsw_config : Default :: default ( ) ,
186- quantization_config : Default :: default ( ) ,
187- strict_mode_config : None ,
188- uuid : None ,
189- metadata : None ,
190- } ;
191-
192- let snapshots_path = Builder :: new ( ) . prefix ( "test_snapshots" ) . tempdir ( ) . unwrap ( ) ;
193-
194- let collection = Collection :: new (
195- collection_name. to_string ( ) ,
196- 0 ,
197- collection_dir. path ( ) ,
198- snapshots_path. path ( ) ,
199- & config,
200- Arc :: new ( SharedStorageConfig :: default ( ) ) ,
201- CollectionShardDistribution :: all_local ( None , 0 ) ,
202- None ,
203- ChannelService :: default ( ) ,
204- dummy_on_replica_failure ( ) ,
205- dummy_request_shard_transfer ( ) ,
206- dummy_abort_shard_transfer ( ) ,
207- None ,
208- None ,
209- ResourceBudget :: default ( ) ,
210- None ,
211- )
212- . await
213- . expect ( "Failed to create new fixture collection" ) ;
214-
215- collection
216- . create_payload_index (
217- "field" . parse ( ) . unwrap ( ) ,
218- PayloadFieldSchema :: FieldType ( PayloadSchemaType :: Integer ) ,
219- HwMeasurementAcc :: new ( ) ,
220- )
221- . await
222- . expect ( "failed to create payload index" ) ;
223-
224- collection
225- }
226-
227- pub fn dummy_on_replica_failure ( ) -> ChangePeerFromState {
228- Arc :: new ( move |_peer_id, _shard_id, _from_state| { } )
229- }
230-
231- pub fn dummy_request_shard_transfer ( ) -> RequestShardTransfer {
232- Arc :: new ( move |_transfer| { } )
233- }
234-
235- pub fn dummy_abort_shard_transfer ( ) -> AbortShardTransfer {
236- Arc :: new ( |_transfer, _reason| { } )
237- }
238-
239- #[ tokio:: test( flavor = "multi_thread" ) ]
240- async fn test_shard_key_migration ( ) {
241- let collection_dir = Builder :: new ( ) . prefix ( "test_collection" ) . tempdir ( ) . unwrap ( ) ;
242-
243- {
244- let collection = make_collection ( COLLECTION_TEST_NAME , & collection_dir) . await ;
245- collection
246- . create_shard_key (
247- ShardKey :: Keyword ( "helloworld" . into ( ) ) ,
248- vec ! [ vec![ ] ] ,
249- ReplicaState :: Active ,
250- )
251- . await
252- . unwrap ( ) ;
253- }
254-
255- let shard_mapping_file = collection_dir. path ( ) . join ( SHARD_KEY_MAPPING_FILE ) ;
256-
257- let shard_key_data: SerdeHelper = {
258- let file = File :: open ( & shard_mapping_file) . unwrap ( ) ;
259- serde_json:: from_reader ( file) . unwrap ( )
260- } ;
261-
262- let shard_key_data = ShardKeyMapping :: from ( shard_key_data) ;
263-
264- // Ensure we have at least one shard key.
265- assert ! ( !shard_key_data. is_empty( ) ) ;
266-
267- // Convert to old shard key and overwrite file on disk.
268- {
269- let old_shard_key_data = SerdeHelper :: Old ( shard_key_data. shard_key_to_shard_ids ) ;
270- let mut writer = File :: create ( & shard_mapping_file) . unwrap ( ) ;
271- serde_json:: to_writer ( & mut writer, & old_shard_key_data) . unwrap ( ) ;
272- }
273-
274- // Ensure on disk is now the old version.
275- {
276- let shard_key_data: SerdeHelper =
277- serde_json:: from_reader ( File :: open ( & shard_mapping_file) . unwrap ( ) ) . unwrap ( ) ;
278-
279- assert ! ( matches!( shard_key_data, SerdeHelper :: Old ( ..) ) ) ;
280- }
281-
282- let snapshots_path = Builder :: new ( ) . prefix ( "test_snapshots" ) . tempdir ( ) . unwrap ( ) ;
283-
284- // Load collection once to trigger mirgation to the new shard-key format.
285- {
286- Collection :: load (
287- COLLECTION_TEST_NAME . to_string ( ) ,
288- 0 ,
289- collection_dir. path ( ) ,
290- snapshots_path. path ( ) ,
291- Default :: default ( ) ,
292- ChannelService :: default ( ) ,
293- dummy_on_replica_failure ( ) ,
294- dummy_request_shard_transfer ( ) ,
295- dummy_abort_shard_transfer ( ) ,
296- None ,
297- None ,
298- ResourceBudget :: default ( ) ,
299- None ,
300- )
301- . await ;
302- }
303-
304- let shard_key_data: SerdeHelper =
305- { serde_json:: from_reader ( File :: open ( & shard_mapping_file) . unwrap ( ) ) . unwrap ( ) } ;
306-
307- // Now we have the new key on disk!
308- assert ! ( matches!( shard_key_data, SerdeHelper :: New ( ..) ) ) ;
309- }
310- }
0 commit comments