Skip to content

Commit 84b2fb8

Browse files
authored
Fix panic on startup with old storage, reenable old shard key format (#7564)
* Revert "Remove old shard key mapping format (#7047)" This reverts commit 783c128. This reverts pull request <#7047>. * Bump TODOs
1 parent af86a3b commit 84b2fb8

2 files changed

Lines changed: 212 additions & 4 deletions

File tree

lib/collection/src/shards/shard_holder/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ impl ShardHolder {
9191
let key_mapping: SaveOnDisk<ShardKeyMapping> =
9292
SaveOnDisk::load_or_init_default(collection_path.join(SHARD_KEY_MAPPING_FILE))?;
9393

94+
// TODO(1.17.0): Remove once the old shardkey format has been removed entirely.
95+
Self::migrate_shard_key_if_needed(&key_mapping)?;
96+
9497
let mut shard_id_to_key_mapping = AHashMap::new();
9598

9699
for (shard_key, shard_ids) in key_mapping.read().iter() {
@@ -1443,6 +1446,23 @@ impl ShardHolder {
14431446
.any(|i| async { i.1.has_remote_shard().await })
14441447
.await
14451448
}
1449+
1450+
/// Migrates the old shard-key format to the new one if necessary.
1451+
// TODO(1.17.0): Remove once the old shardkey format has been removed entirely.
1452+
fn migrate_shard_key_if_needed(
1453+
key_mapping: &SaveOnDisk<ShardKeyMapping>,
1454+
) -> CollectionResult<()> {
1455+
if key_mapping.read().was_old_format {
1456+
// We automatically migrate to the new format when writing once, which we do here.
1457+
log::debug!("Migrating persisted shard key mapping to new format");
1458+
key_mapping.write(|i| {
1459+
// Also set this to true for consistency. However it should never be read.
1460+
i.was_old_format = false;
1461+
})?;
1462+
}
1463+
1464+
Ok(())
1465+
}
14461466
}
14471467

14481468
#[derive(Debug, Clone, PartialEq, Eq)]

lib/collection/src/shards/shard_holder/shard_mapping.rs

Lines changed: 192 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ use crate::shards::shard::ShardId;
1313
#[serde(from = "SerdeHelper", into = "SerdeHelper")]
1414
pub struct ShardKeyMapping {
1515
shard_key_to_shard_ids: HashMap<ShardKey, HashSet<ShardId>>,
16+
17+
/// `true` if the ShardKeyMapping was specified in the old format.
18+
// TODO(1.17.0): Remove once all keys are migrated.
19+
#[serde(skip)]
20+
pub(crate) was_old_format: bool,
1621
}
1722

1823
impl ops::Deref for ShardKeyMapping {
@@ -74,17 +79,41 @@ impl ShardKeyMapping {
7479

7580
impl From<SerdeHelper> for ShardKeyMapping {
7681
fn from(helper: SerdeHelper) -> Self {
82+
let mut was_old_format = false;
83+
84+
let shard_key_to_shard_ids = match helper {
85+
SerdeHelper::New(key_ids_pairs) => key_ids_pairs
86+
.into_iter()
87+
.map(KeyIdsPair::into_parts)
88+
.collect(),
89+
90+
SerdeHelper::Old(key_ids_map) => {
91+
was_old_format = true;
92+
key_ids_map
93+
}
94+
};
95+
7796
Self {
78-
shard_key_to_shard_ids: helper.0.into_iter().map(KeyIdsPair::into_parts).collect(),
97+
shard_key_to_shard_ids,
98+
was_old_format,
7999
}
80100
}
81101
}
82102

83-
/// Helper structure for persisting shard key mapping in safe format
103+
/// Helper structure for persisting shard key mapping
104+
///
105+
/// The original format of persisting shard key mappings as hash map is broken. It forgets type
106+
/// information for the shard key, which resulted in shard key numbers to be converted into
107+
/// strings.
84108
///
85109
/// Bug: <https://github.com/qdrant/qdrant/pull/5838>
86110
#[derive(Deserialize, Serialize)]
87-
struct SerdeHelper(Vec<KeyIdsPair>);
111+
#[serde(untagged)]
112+
enum SerdeHelper {
113+
New(Vec<KeyIdsPair>),
114+
// TODO(1.15): remove this old format, deployment should exclusively be using new format
115+
Old(HashMap<ShardKey, HashSet<ShardId>>),
116+
}
88117

89118
impl From<ShardKeyMapping> for SerdeHelper {
90119
fn from(mapping: ShardKeyMapping) -> Self {
@@ -93,7 +122,7 @@ impl From<ShardKeyMapping> for SerdeHelper {
93122
.into_iter()
94123
.map(KeyIdsPair::from)
95124
.collect();
96-
Self(key_ids_pairs)
125+
Self::New(key_ids_pairs)
97126
}
98127
}
99128

@@ -120,3 +149,162 @@ impl From<(ShardKey, HashSet<ShardId>)> for KeyIdsPair {
120149
Self { key, shard_ids }
121150
}
122151
}
152+
153+
#[cfg(test)]
154+
mod test {
155+
156+
use std::sync::Arc;
157+
158+
use common::budget::ResourceBudget;
159+
use common::counter::hardware_accumulator::HwMeasurementAcc;
160+
use fs_err::File;
161+
use segment::types::{PayloadFieldSchema, PayloadSchemaType};
162+
use tempfile::{Builder, TempDir};
163+
164+
use super::*;
165+
use crate::collection::{Collection, RequestShardTransfer};
166+
use crate::config::{CollectionConfigInternal, CollectionParams, ShardingMethod, WalConfig};
167+
use crate::operations::shared_storage_config::SharedStorageConfig;
168+
use crate::optimizers_builder::OptimizersConfig;
169+
use crate::shards::channel_service::ChannelService;
170+
use crate::shards::collection_shard_distribution::CollectionShardDistribution;
171+
use crate::shards::replica_set::{AbortShardTransfer, ChangePeerFromState, ReplicaState};
172+
use crate::shards::shard_holder::SHARD_KEY_MAPPING_FILE;
173+
174+
const COLLECTION_TEST_NAME: &str = "shard_key_test";
175+
176+
async fn make_collection(collection_name: &str, collection_dir: &TempDir) -> Collection {
177+
let wal_config = WalConfig::default();
178+
let mut collection_params = CollectionParams::empty();
179+
collection_params.sharding_method = Some(ShardingMethod::Custom);
180+
181+
let config = CollectionConfigInternal {
182+
params: collection_params,
183+
optimizer_config: OptimizersConfig::fixture(),
184+
wal_config,
185+
hnsw_config: Default::default(),
186+
quantization_config: Default::default(),
187+
strict_mode_config: None,
188+
uuid: None,
189+
metadata: None,
190+
};
191+
192+
let snapshots_path = Builder::new().prefix("test_snapshots").tempdir().unwrap();
193+
194+
let collection = Collection::new(
195+
collection_name.to_string(),
196+
0,
197+
collection_dir.path(),
198+
snapshots_path.path(),
199+
&config,
200+
Arc::new(SharedStorageConfig::default()),
201+
CollectionShardDistribution::all_local(None, 0),
202+
None,
203+
ChannelService::default(),
204+
dummy_on_replica_failure(),
205+
dummy_request_shard_transfer(),
206+
dummy_abort_shard_transfer(),
207+
None,
208+
None,
209+
ResourceBudget::default(),
210+
None,
211+
)
212+
.await
213+
.expect("Failed to create new fixture collection");
214+
215+
collection
216+
.create_payload_index(
217+
"field".parse().unwrap(),
218+
PayloadFieldSchema::FieldType(PayloadSchemaType::Integer),
219+
HwMeasurementAcc::new(),
220+
)
221+
.await
222+
.expect("failed to create payload index");
223+
224+
collection
225+
}
226+
227+
pub fn dummy_on_replica_failure() -> ChangePeerFromState {
228+
Arc::new(move |_peer_id, _shard_id, _from_state| {})
229+
}
230+
231+
pub fn dummy_request_shard_transfer() -> RequestShardTransfer {
232+
Arc::new(move |_transfer| {})
233+
}
234+
235+
pub fn dummy_abort_shard_transfer() -> AbortShardTransfer {
236+
Arc::new(|_transfer, _reason| {})
237+
}
238+
239+
#[tokio::test(flavor = "multi_thread")]
240+
async fn test_shard_key_migration() {
241+
let collection_dir = Builder::new().prefix("test_collection").tempdir().unwrap();
242+
243+
{
244+
let collection = make_collection(COLLECTION_TEST_NAME, &collection_dir).await;
245+
collection
246+
.create_shard_key(
247+
ShardKey::Keyword("helloworld".into()),
248+
vec![vec![]],
249+
ReplicaState::Active,
250+
)
251+
.await
252+
.unwrap();
253+
}
254+
255+
let shard_mapping_file = collection_dir.path().join(SHARD_KEY_MAPPING_FILE);
256+
257+
let shard_key_data: SerdeHelper = {
258+
let file = File::open(&shard_mapping_file).unwrap();
259+
serde_json::from_reader(file).unwrap()
260+
};
261+
262+
let shard_key_data = ShardKeyMapping::from(shard_key_data);
263+
264+
// Ensure we have at least one shard key.
265+
assert!(!shard_key_data.is_empty());
266+
267+
// Convert to old shard key and overwrite file on disk.
268+
{
269+
let old_shard_key_data = SerdeHelper::Old(shard_key_data.shard_key_to_shard_ids);
270+
let mut writer = File::create(&shard_mapping_file).unwrap();
271+
serde_json::to_writer(&mut writer, &old_shard_key_data).unwrap();
272+
}
273+
274+
// Ensure on disk is now the old version.
275+
{
276+
let shard_key_data: SerdeHelper =
277+
serde_json::from_reader(File::open(&shard_mapping_file).unwrap()).unwrap();
278+
279+
assert!(matches!(shard_key_data, SerdeHelper::Old(..)));
280+
}
281+
282+
let snapshots_path = Builder::new().prefix("test_snapshots").tempdir().unwrap();
283+
284+
// Load collection once to trigger mirgation to the new shard-key format.
285+
{
286+
Collection::load(
287+
COLLECTION_TEST_NAME.to_string(),
288+
0,
289+
collection_dir.path(),
290+
snapshots_path.path(),
291+
Default::default(),
292+
ChannelService::default(),
293+
dummy_on_replica_failure(),
294+
dummy_request_shard_transfer(),
295+
dummy_abort_shard_transfer(),
296+
None,
297+
None,
298+
ResourceBudget::default(),
299+
None,
300+
)
301+
.await;
302+
}
303+
304+
let shard_key_data: SerdeHelper =
305+
{ serde_json::from_reader(File::open(&shard_mapping_file).unwrap()).unwrap() };
306+
307+
// Now we have the new key on disk!
308+
assert!(matches!(shard_key_data, SerdeHelper::New(..)));
309+
}
310+
}

0 commit comments

Comments
 (0)