@@ -6,6 +6,7 @@ use crate::collection_state::{ShardInfo, State};
66use crate :: config:: CollectionConfigInternal ;
77use crate :: operations:: types:: { CollectionError , CollectionResult } ;
88use crate :: shards:: replica_set:: ShardReplicaSet ;
9+ use crate :: shards:: resharding:: ReshardState ;
910use crate :: shards:: shard:: { PeerId , ShardId } ;
1011use crate :: shards:: shard_holder:: ShardTransferChange ;
1112use crate :: shards:: shard_holder:: shard_mapping:: ShardKeyMapping ;
@@ -29,12 +30,22 @@ impl Collection {
2930 this_peer_id : PeerId ,
3031 abort_transfer : impl FnMut ( ShardTransfer ) ,
3132 ) -> CollectionResult < ( ) > {
32- self . apply_config ( state. config ) . await ?;
33- self . apply_shard_transfers ( state. transfers , this_peer_id, abort_transfer)
33+ let State {
34+ config,
35+ shards,
36+ resharding,
37+ transfers,
38+ shards_key_mapping,
39+ payload_index_schema,
40+ } = state;
41+
42+ self . apply_config ( config) . await ?;
43+ self . apply_shard_transfers ( transfers, this_peer_id, abort_transfer)
3444 . await ?;
35- self . apply_shard_info ( state. shards , state. shards_key_mapping . to_map ( ) )
45+ self . apply_reshard_state ( resharding) . await ?;
46+ self . apply_shard_info ( shards, shards_key_mapping. to_map ( ) )
3647 . await ?;
37- self . apply_payload_index_schema ( state . payload_index_schema )
48+ self . apply_payload_index_schema ( payload_index_schema)
3849 . await ?;
3950 Ok ( ( ) )
4051 }
@@ -75,6 +86,19 @@ impl Collection {
7586 Ok ( ( ) )
7687 }
7788
89+ async fn apply_reshard_state ( & self , resharding : Option < ReshardState > ) -> CollectionResult < ( ) > {
90+ // We don't have to explicitly abort resharding or bump shard replica states, because:
91+ // - peers are not driving resharding themselves
92+ // - ongoing (resharding) shard transfers are explicitly updated
93+ // - shard replica set states are explicitly updated
94+ self . shards_holder
95+ . write ( )
96+ . await
97+ . resharding_state
98+ . write ( |state| * state = resharding) ?;
99+ Ok ( ( ) )
100+ }
101+
78102 async fn apply_config ( & self , new_config : CollectionConfigInternal ) -> CollectionResult < ( ) > {
79103 let recreate_optimizers;
80104
0 commit comments