Skip to content

Make shard holder and count changes atomic in resharding #8709

@timvisee

Description

@timvisee

#8522 improved resharding operations by making them idempotent. Mostly at least.

One key step during start/finish/abort operations is management of shards. Specifically creation/deletion of shards and adjusting the shard count number. This currently is not idempotent nor atomic and may cause issues.

Such changes must be applied exactly once. If we kill a node while the operation is applied, it's possible to apply it multiple times or to not apply it at all.

We should make these steps fully idempotent as well, so that we'll always reconcile into the same exact end state, even if we run an operation multiple times because of replay.

Critical sections:

  • start:
    if resharding_key.direction == ReshardingDirection::Up {
    let mut config = self.collection_config.write().await;
    match config.params.sharding_method.unwrap_or_default() {
    // If adding a shard, increase persisted count so we load it on restart
    ShardingMethod::Auto => {
    debug_assert_eq!(config.params.shard_number.get(), resharding_key.shard_id);
    config.params.shard_number = config
    .params
    .shard_number
    .checked_add(1)
    .expect("cannot have more than u32::MAX shards after resharding");
    if let Err(err) = config.save(&self.path) {
    log::error!(
    "Failed to update and save collection config during resharding: {err}",
    );
    }
    }
    // Custom shards don't use the persisted count, we don't change it
    ShardingMethod::Custom => {}
    }
    }
  • finish:
    if resharding_key.direction == ReshardingDirection::Down {
    // Remove the shard we've now migrated all points out of
    if let Some(shard_key) = &resharding_key.shard_key {
    shard_holder.remove_shard_from_key_mapping(resharding_key.shard_id, shard_key)?;
    }
    shard_holder
    .drop_and_remove_shard(resharding_key.shard_id)
    .await?;
    {
    let mut config = self.collection_config.write().await;
    match config.params.sharding_method.unwrap_or_default() {
    // If removing a shard, decrease persisted count so we don't load it on restart
    ShardingMethod::Auto => {
    debug_assert_eq!(
    config.params.shard_number.get() - 1,
    resharding_key.shard_id,
    );
    config.params.shard_number =
    NonZeroU32::new(config.params.shard_number.get() - 1)
    .expect("cannot have zero shards after finishing resharding");
    if let Err(err) = config.save(&self.path) {
    log::error!(
    "Failed to update and save collection config during resharding: {err}"
    );
    }
    }
    // Custom shards don't use the persisted count, we don't change it
    ShardingMethod::Custom => {}
    }
    }
    }
  • abort:
    // Decrease the persisted shard count, ensures we don't load dropped shard on restart
    if resharding_key.direction == ReshardingDirection::Up {
    let mut config = self.collection_config.write().await;
    match config.params.sharding_method.unwrap_or_default() {
    // If removing a shard, decrease persisted count so we don't load it on restart
    ShardingMethod::Auto => {
    debug_assert_eq!(
    config.params.shard_number.get() - 1,
    resharding_key.shard_id,
    );
    config.params.shard_number =
    NonZeroU32::new(config.params.shard_number.get() - 1)
    .expect("cannot have zero shards after aborting resharding");
    if let Err(err) = config.save(&self.path) {
    log::error!(
    "Failed to update and save collection config during resharding: {err}"
    );
    }
    }
    // Custom shards don't use the persisted count, we don't change it
    ShardingMethod::Custom => {}
    }
    }

Metadata

Metadata

Assignees

Labels

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions