@@ -13,6 +13,11 @@ use crate::shards::shard::ShardId;
1313#[ serde( from = "SerdeHelper" , into = "SerdeHelper" ) ]
1414pub struct ShardKeyMapping {
1515 shard_key_to_shard_ids : HashMap < ShardKey , HashSet < ShardId > > ,
16+
17+ /// `true` if the ShardKeyMapping was specified in the old format.
18+ // TODO(1.17.0): Remove once all keys are migrated.
19+ #[ serde( skip) ]
20+ pub ( crate ) was_old_format : bool ,
1621}
1722
1823impl ops:: Deref for ShardKeyMapping {
@@ -74,17 +79,41 @@ impl ShardKeyMapping {
7479
7580impl From < SerdeHelper > for ShardKeyMapping {
7681 fn from ( helper : SerdeHelper ) -> Self {
82+ let mut was_old_format = false ;
83+
84+ let shard_key_to_shard_ids = match helper {
85+ SerdeHelper :: New ( key_ids_pairs) => key_ids_pairs
86+ . into_iter ( )
87+ . map ( KeyIdsPair :: into_parts)
88+ . collect ( ) ,
89+
90+ SerdeHelper :: Old ( key_ids_map) => {
91+ was_old_format = true ;
92+ key_ids_map
93+ }
94+ } ;
95+
7796 Self {
78- shard_key_to_shard_ids : helper. 0 . into_iter ( ) . map ( KeyIdsPair :: into_parts) . collect ( ) ,
97+ shard_key_to_shard_ids,
98+ was_old_format,
7999 }
80100 }
81101}
82102
83- /// Helper structure for persisting shard key mapping in safe format
103+ /// Helper structure for persisting shard key mapping
104+ ///
105+ /// The original format of persisting shard key mappings as hash map is broken. It forgets type
106+ /// information for the shard key, which resulted in shard key numbers to be converted into
107+ /// strings.
84108///
85109/// Bug: <https://github.com/qdrant/qdrant/pull/5838>
86110#[ derive( Deserialize , Serialize ) ]
87- struct SerdeHelper ( Vec < KeyIdsPair > ) ;
111+ #[ serde( untagged) ]
112+ enum SerdeHelper {
113+ New ( Vec < KeyIdsPair > ) ,
114+ // TODO(1.15): remove this old format, deployment should exclusively be using new format
115+ Old ( HashMap < ShardKey , HashSet < ShardId > > ) ,
116+ }
88117
89118impl From < ShardKeyMapping > for SerdeHelper {
90119 fn from ( mapping : ShardKeyMapping ) -> Self {
@@ -93,7 +122,7 @@ impl From<ShardKeyMapping> for SerdeHelper {
93122 . into_iter ( )
94123 . map ( KeyIdsPair :: from)
95124 . collect ( ) ;
96- Self ( key_ids_pairs)
125+ Self :: New ( key_ids_pairs)
97126 }
98127}
99128
@@ -120,3 +149,162 @@ impl From<(ShardKey, HashSet<ShardId>)> for KeyIdsPair {
120149 Self { key, shard_ids }
121150 }
122151}
152+
153+ #[ cfg( test) ]
154+ mod test {
155+
156+ use std:: sync:: Arc ;
157+
158+ use common:: budget:: ResourceBudget ;
159+ use common:: counter:: hardware_accumulator:: HwMeasurementAcc ;
160+ use fs_err:: File ;
161+ use segment:: types:: { PayloadFieldSchema , PayloadSchemaType } ;
162+ use tempfile:: { Builder , TempDir } ;
163+
164+ use super :: * ;
165+ use crate :: collection:: { Collection , RequestShardTransfer } ;
166+ use crate :: config:: { CollectionConfigInternal , CollectionParams , ShardingMethod , WalConfig } ;
167+ use crate :: operations:: shared_storage_config:: SharedStorageConfig ;
168+ use crate :: optimizers_builder:: OptimizersConfig ;
169+ use crate :: shards:: channel_service:: ChannelService ;
170+ use crate :: shards:: collection_shard_distribution:: CollectionShardDistribution ;
171+ use crate :: shards:: replica_set:: { AbortShardTransfer , ChangePeerFromState , ReplicaState } ;
172+ use crate :: shards:: shard_holder:: SHARD_KEY_MAPPING_FILE ;
173+
174+ const COLLECTION_TEST_NAME : & str = "shard_key_test" ;
175+
176+ async fn make_collection ( collection_name : & str , collection_dir : & TempDir ) -> Collection {
177+ let wal_config = WalConfig :: default ( ) ;
178+ let mut collection_params = CollectionParams :: empty ( ) ;
179+ collection_params. sharding_method = Some ( ShardingMethod :: Custom ) ;
180+
181+ let config = CollectionConfigInternal {
182+ params : collection_params,
183+ optimizer_config : OptimizersConfig :: fixture ( ) ,
184+ wal_config,
185+ hnsw_config : Default :: default ( ) ,
186+ quantization_config : Default :: default ( ) ,
187+ strict_mode_config : None ,
188+ uuid : None ,
189+ metadata : None ,
190+ } ;
191+
192+ let snapshots_path = Builder :: new ( ) . prefix ( "test_snapshots" ) . tempdir ( ) . unwrap ( ) ;
193+
194+ let collection = Collection :: new (
195+ collection_name. to_string ( ) ,
196+ 0 ,
197+ collection_dir. path ( ) ,
198+ snapshots_path. path ( ) ,
199+ & config,
200+ Arc :: new ( SharedStorageConfig :: default ( ) ) ,
201+ CollectionShardDistribution :: all_local ( None , 0 ) ,
202+ None ,
203+ ChannelService :: default ( ) ,
204+ dummy_on_replica_failure ( ) ,
205+ dummy_request_shard_transfer ( ) ,
206+ dummy_abort_shard_transfer ( ) ,
207+ None ,
208+ None ,
209+ ResourceBudget :: default ( ) ,
210+ None ,
211+ )
212+ . await
213+ . expect ( "Failed to create new fixture collection" ) ;
214+
215+ collection
216+ . create_payload_index (
217+ "field" . parse ( ) . unwrap ( ) ,
218+ PayloadFieldSchema :: FieldType ( PayloadSchemaType :: Integer ) ,
219+ HwMeasurementAcc :: new ( ) ,
220+ )
221+ . await
222+ . expect ( "failed to create payload index" ) ;
223+
224+ collection
225+ }
226+
227+ pub fn dummy_on_replica_failure ( ) -> ChangePeerFromState {
228+ Arc :: new ( move |_peer_id, _shard_id, _from_state| { } )
229+ }
230+
231+ pub fn dummy_request_shard_transfer ( ) -> RequestShardTransfer {
232+ Arc :: new ( move |_transfer| { } )
233+ }
234+
235+ pub fn dummy_abort_shard_transfer ( ) -> AbortShardTransfer {
236+ Arc :: new ( |_transfer, _reason| { } )
237+ }
238+
239+ #[ tokio:: test( flavor = "multi_thread" ) ]
240+ async fn test_shard_key_migration ( ) {
241+ let collection_dir = Builder :: new ( ) . prefix ( "test_collection" ) . tempdir ( ) . unwrap ( ) ;
242+
243+ {
244+ let collection = make_collection ( COLLECTION_TEST_NAME , & collection_dir) . await ;
245+ collection
246+ . create_shard_key (
247+ ShardKey :: Keyword ( "helloworld" . into ( ) ) ,
248+ vec ! [ vec![ ] ] ,
249+ ReplicaState :: Active ,
250+ )
251+ . await
252+ . unwrap ( ) ;
253+ }
254+
255+ let shard_mapping_file = collection_dir. path ( ) . join ( SHARD_KEY_MAPPING_FILE ) ;
256+
257+ let shard_key_data: SerdeHelper = {
258+ let file = File :: open ( & shard_mapping_file) . unwrap ( ) ;
259+ serde_json:: from_reader ( file) . unwrap ( )
260+ } ;
261+
262+ let shard_key_data = ShardKeyMapping :: from ( shard_key_data) ;
263+
264+ // Ensure we have at least one shard key.
265+ assert ! ( !shard_key_data. is_empty( ) ) ;
266+
267+ // Convert to old shard key and overwrite file on disk.
268+ {
269+ let old_shard_key_data = SerdeHelper :: Old ( shard_key_data. shard_key_to_shard_ids ) ;
270+ let mut writer = File :: create ( & shard_mapping_file) . unwrap ( ) ;
271+ serde_json:: to_writer ( & mut writer, & old_shard_key_data) . unwrap ( ) ;
272+ }
273+
274+ // Ensure on disk is now the old version.
275+ {
276+ let shard_key_data: SerdeHelper =
277+ serde_json:: from_reader ( File :: open ( & shard_mapping_file) . unwrap ( ) ) . unwrap ( ) ;
278+
279+ assert ! ( matches!( shard_key_data, SerdeHelper :: Old ( ..) ) ) ;
280+ }
281+
282+ let snapshots_path = Builder :: new ( ) . prefix ( "test_snapshots" ) . tempdir ( ) . unwrap ( ) ;
283+
284+ // Load collection once to trigger mirgation to the new shard-key format.
285+ {
286+ Collection :: load (
287+ COLLECTION_TEST_NAME . to_string ( ) ,
288+ 0 ,
289+ collection_dir. path ( ) ,
290+ snapshots_path. path ( ) ,
291+ Default :: default ( ) ,
292+ ChannelService :: default ( ) ,
293+ dummy_on_replica_failure ( ) ,
294+ dummy_request_shard_transfer ( ) ,
295+ dummy_abort_shard_transfer ( ) ,
296+ None ,
297+ None ,
298+ ResourceBudget :: default ( ) ,
299+ None ,
300+ )
301+ . await ;
302+ }
303+
304+ let shard_key_data: SerdeHelper =
305+ { serde_json:: from_reader ( File :: open ( & shard_mapping_file) . unwrap ( ) ) . unwrap ( ) } ;
306+
307+ // Now we have the new key on disk!
308+ assert ! ( matches!( shard_key_data, SerdeHelper :: New ( ..) ) ) ;
309+ }
310+ }
0 commit comments