Skip to content

Commit 783c128

Browse files
committed
Remove old shard key mapping format (#7047)
* Remove old shard key mapping format variant * Change single variant enum into tuple struct * Remove migration logic for old to new shard key format
1 parent 2e239d5 commit 783c128

2 files changed

Lines changed: 4 additions & 212 deletions

File tree

lib/collection/src/shards/shard_holder/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,6 @@ impl ShardHolder {
8686
let key_mapping: SaveOnDisk<ShardKeyMapping> =
8787
SaveOnDisk::load_or_init_default(collection_path.join(SHARD_KEY_MAPPING_FILE))?;
8888

89-
// TODO(shardkey): Remove once the old shardkey format has been removed entirely.
90-
Self::migrate_shard_key_if_needed(&key_mapping)?;
91-
9289
let mut shard_id_to_key_mapping = AHashMap::new();
9390

9491
for (shard_key, shard_ids) in key_mapping.read().iter() {
@@ -1409,23 +1406,6 @@ impl ShardHolder {
14091406
.any(|i| async { i.1.has_remote_shard().await })
14101407
.await
14111408
}
1412-
1413-
/// Migrates the old shard-key format to the new one if necessary.
1414-
/// TODO(shardkey): Remove once the old shardkey format has been removed entirely.
1415-
fn migrate_shard_key_if_needed(
1416-
key_mapping: &SaveOnDisk<ShardKeyMapping>,
1417-
) -> CollectionResult<()> {
1418-
if key_mapping.read().was_old_format {
1419-
// We automatically migrate to the new format when writing once, which we do here.
1420-
log::debug!("Migrating persisted shard key mapping to new format");
1421-
key_mapping.write(|i| {
1422-
// Also set this to true for consistency. However it should never be read.
1423-
i.was_old_format = false;
1424-
})?;
1425-
}
1426-
1427-
Ok(())
1428-
}
14291409
}
14301410

14311411
#[derive(Debug, Clone, PartialEq, Eq)]

lib/collection/src/shards/shard_holder/shard_mapping.rs

Lines changed: 4 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ use crate::shards::shard::ShardId;
1313
#[serde(from = "SerdeHelper", into = "SerdeHelper")]
1414
pub struct ShardKeyMapping {
1515
shard_key_to_shard_ids: HashMap<ShardKey, HashSet<ShardId>>,
16-
17-
/// `true` if the ShardKeyMapping was specified in the old format.
18-
/// TODO(shardkey): Remove once all keys are migrated.
19-
#[serde(skip)]
20-
pub(crate) was_old_format: bool,
2116
}
2217

2318
impl ops::Deref for ShardKeyMapping {
@@ -79,41 +74,17 @@ impl ShardKeyMapping {
7974

8075
impl From<SerdeHelper> for ShardKeyMapping {
8176
fn from(helper: SerdeHelper) -> Self {
82-
let mut was_old_format = false;
83-
84-
let shard_key_to_shard_ids = match helper {
85-
SerdeHelper::New(key_ids_pairs) => key_ids_pairs
86-
.into_iter()
87-
.map(KeyIdsPair::into_parts)
88-
.collect(),
89-
90-
SerdeHelper::Old(key_ids_map) => {
91-
was_old_format = true;
92-
key_ids_map
93-
}
94-
};
95-
9677
Self {
97-
shard_key_to_shard_ids,
98-
was_old_format,
78+
shard_key_to_shard_ids: helper.0.into_iter().map(KeyIdsPair::into_parts).collect(),
9979
}
10080
}
10181
}
10282

103-
/// Helper structure for persisting shard key mapping
104-
///
105-
/// The original format of persisting shard key mappings as hash map is broken. It forgets type
106-
/// information for the shard key, which resulted in shard key numbers to be converted into
107-
/// strings.
83+
/// Helper structure for persisting shard key mapping in safe format
10884
///
10985
/// Bug: <https://github.com/qdrant/qdrant/pull/5838>
11086
#[derive(Deserialize, Serialize)]
111-
#[serde(untagged)]
112-
enum SerdeHelper {
113-
New(Vec<KeyIdsPair>),
114-
// TODO(1.15): remove this old format, deployment should exclusively be using new format
115-
Old(HashMap<ShardKey, HashSet<ShardId>>),
116-
}
87+
struct SerdeHelper(Vec<KeyIdsPair>);
11788

11889
impl From<ShardKeyMapping> for SerdeHelper {
11990
fn from(mapping: ShardKeyMapping) -> Self {
@@ -122,7 +93,7 @@ impl From<ShardKeyMapping> for SerdeHelper {
12293
.into_iter()
12394
.map(KeyIdsPair::from)
12495
.collect();
125-
Self::New(key_ids_pairs)
96+
Self(key_ids_pairs)
12697
}
12798
}
12899

@@ -149,162 +120,3 @@ impl From<(ShardKey, HashSet<ShardId>)> for KeyIdsPair {
149120
Self { key, shard_ids }
150121
}
151122
}
152-
153-
#[cfg(test)]
154-
mod test {
155-
156-
use std::sync::Arc;
157-
158-
use common::budget::ResourceBudget;
159-
use common::counter::hardware_accumulator::HwMeasurementAcc;
160-
use fs_err::File;
161-
use segment::types::{PayloadFieldSchema, PayloadSchemaType};
162-
use tempfile::{Builder, TempDir};
163-
164-
use super::*;
165-
use crate::collection::{Collection, RequestShardTransfer};
166-
use crate::config::{CollectionConfigInternal, CollectionParams, ShardingMethod, WalConfig};
167-
use crate::operations::shared_storage_config::SharedStorageConfig;
168-
use crate::optimizers_builder::OptimizersConfig;
169-
use crate::shards::channel_service::ChannelService;
170-
use crate::shards::collection_shard_distribution::CollectionShardDistribution;
171-
use crate::shards::replica_set::{AbortShardTransfer, ChangePeerFromState, ReplicaState};
172-
use crate::shards::shard_holder::SHARD_KEY_MAPPING_FILE;
173-
174-
const COLLECTION_TEST_NAME: &str = "shard_key_test";
175-
176-
async fn make_collection(collection_name: &str, collection_dir: &TempDir) -> Collection {
177-
let wal_config = WalConfig::default();
178-
let mut collection_params = CollectionParams::empty();
179-
collection_params.sharding_method = Some(ShardingMethod::Custom);
180-
181-
let config = CollectionConfigInternal {
182-
params: collection_params,
183-
optimizer_config: OptimizersConfig::fixture(),
184-
wal_config,
185-
hnsw_config: Default::default(),
186-
quantization_config: Default::default(),
187-
strict_mode_config: None,
188-
uuid: None,
189-
metadata: None,
190-
};
191-
192-
let snapshots_path = Builder::new().prefix("test_snapshots").tempdir().unwrap();
193-
194-
let collection = Collection::new(
195-
collection_name.to_string(),
196-
0,
197-
collection_dir.path(),
198-
snapshots_path.path(),
199-
&config,
200-
Arc::new(SharedStorageConfig::default()),
201-
CollectionShardDistribution::all_local(None, 0),
202-
None,
203-
ChannelService::default(),
204-
dummy_on_replica_failure(),
205-
dummy_request_shard_transfer(),
206-
dummy_abort_shard_transfer(),
207-
None,
208-
None,
209-
ResourceBudget::default(),
210-
None,
211-
)
212-
.await
213-
.expect("Failed to create new fixture collection");
214-
215-
collection
216-
.create_payload_index(
217-
"field".parse().unwrap(),
218-
PayloadFieldSchema::FieldType(PayloadSchemaType::Integer),
219-
HwMeasurementAcc::new(),
220-
)
221-
.await
222-
.expect("failed to create payload index");
223-
224-
collection
225-
}
226-
227-
pub fn dummy_on_replica_failure() -> ChangePeerFromState {
228-
Arc::new(move |_peer_id, _shard_id, _from_state| {})
229-
}
230-
231-
pub fn dummy_request_shard_transfer() -> RequestShardTransfer {
232-
Arc::new(move |_transfer| {})
233-
}
234-
235-
pub fn dummy_abort_shard_transfer() -> AbortShardTransfer {
236-
Arc::new(|_transfer, _reason| {})
237-
}
238-
239-
#[tokio::test(flavor = "multi_thread")]
240-
async fn test_shard_key_migration() {
241-
let collection_dir = Builder::new().prefix("test_collection").tempdir().unwrap();
242-
243-
{
244-
let collection = make_collection(COLLECTION_TEST_NAME, &collection_dir).await;
245-
collection
246-
.create_shard_key(
247-
ShardKey::Keyword("helloworld".into()),
248-
vec![vec![]],
249-
ReplicaState::Active,
250-
)
251-
.await
252-
.unwrap();
253-
}
254-
255-
let shard_mapping_file = collection_dir.path().join(SHARD_KEY_MAPPING_FILE);
256-
257-
let shard_key_data: SerdeHelper = {
258-
let file = File::open(&shard_mapping_file).unwrap();
259-
serde_json::from_reader(file).unwrap()
260-
};
261-
262-
let shard_key_data = ShardKeyMapping::from(shard_key_data);
263-
264-
// Ensure we have at least one shard key.
265-
assert!(!shard_key_data.is_empty());
266-
267-
// Convert to old shard key and overwrite file on disk.
268-
{
269-
let old_shard_key_data = SerdeHelper::Old(shard_key_data.shard_key_to_shard_ids);
270-
let mut writer = File::create(&shard_mapping_file).unwrap();
271-
serde_json::to_writer(&mut writer, &old_shard_key_data).unwrap();
272-
}
273-
274-
// Ensure on disk is now the old version.
275-
{
276-
let shard_key_data: SerdeHelper =
277-
serde_json::from_reader(File::open(&shard_mapping_file).unwrap()).unwrap();
278-
279-
assert!(matches!(shard_key_data, SerdeHelper::Old(..)));
280-
}
281-
282-
let snapshots_path = Builder::new().prefix("test_snapshots").tempdir().unwrap();
283-
284-
// Load collection once to trigger mirgation to the new shard-key format.
285-
{
286-
Collection::load(
287-
COLLECTION_TEST_NAME.to_string(),
288-
0,
289-
collection_dir.path(),
290-
snapshots_path.path(),
291-
Default::default(),
292-
ChannelService::default(),
293-
dummy_on_replica_failure(),
294-
dummy_request_shard_transfer(),
295-
dummy_abort_shard_transfer(),
296-
None,
297-
None,
298-
ResourceBudget::default(),
299-
None,
300-
)
301-
.await;
302-
}
303-
304-
let shard_key_data: SerdeHelper =
305-
{ serde_json::from_reader(File::open(&shard_mapping_file).unwrap()).unwrap() };
306-
307-
// Now we have the new key on disk!
308-
assert!(matches!(shard_key_data, SerdeHelper::New(..)));
309-
}
310-
}

0 commit comments

Comments
 (0)