Skip to content

Commit c283b00

Browse files
authored
Fix consensus snapshot not applying resharding state (#6210)
* Destruct struct when applying collection state * In consensus snapshot, apply resharding state
1 parent c36cc07 commit c283b00

1 file changed

Lines changed: 28 additions & 4 deletions

File tree

lib/collection/src/collection/state_management.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::collection_state::{ShardInfo, State};
66
use crate::config::CollectionConfigInternal;
77
use crate::operations::types::{CollectionError, CollectionResult};
88
use crate::shards::replica_set::ShardReplicaSet;
9+
use crate::shards::resharding::ReshardState;
910
use crate::shards::shard::{PeerId, ShardId};
1011
use crate::shards::shard_holder::ShardTransferChange;
1112
use crate::shards::shard_holder::shard_mapping::ShardKeyMapping;
@@ -29,12 +30,22 @@ impl Collection {
2930
this_peer_id: PeerId,
3031
abort_transfer: impl FnMut(ShardTransfer),
3132
) -> CollectionResult<()> {
32-
self.apply_config(state.config).await?;
33-
self.apply_shard_transfers(state.transfers, this_peer_id, abort_transfer)
33+
let State {
34+
config,
35+
shards,
36+
resharding,
37+
transfers,
38+
shards_key_mapping,
39+
payload_index_schema,
40+
} = state;
41+
42+
self.apply_config(config).await?;
43+
self.apply_shard_transfers(transfers, this_peer_id, abort_transfer)
3444
.await?;
35-
self.apply_shard_info(state.shards, state.shards_key_mapping.to_map())
45+
self.apply_reshard_state(resharding).await?;
46+
self.apply_shard_info(shards, shards_key_mapping.to_map())
3647
.await?;
37-
self.apply_payload_index_schema(state.payload_index_schema)
48+
self.apply_payload_index_schema(payload_index_schema)
3849
.await?;
3950
Ok(())
4051
}
@@ -75,6 +86,19 @@ impl Collection {
7586
Ok(())
7687
}
7788

89+
async fn apply_reshard_state(&self, resharding: Option<ReshardState>) -> CollectionResult<()> {
90+
// We don't have to explicitly abort resharding or bump shard replica states, because:
91+
// - peers are not driving resharding themselves
92+
// - ongoing (resharding) shard transfers are explicitly updated
93+
// - shard replica set states are explicitly updated
94+
self.shards_holder
95+
.write()
96+
.await
97+
.resharding_state
98+
.write(|state| *state = resharding)?;
99+
Ok(())
100+
}
101+
78102
async fn apply_config(&self, new_config: CollectionConfigInternal) -> CollectionResult<()> {
79103
let recreate_optimizers;
80104

0 commit comments

Comments
 (0)