Skip to content

Commit 67e26d6

Browse files
committed
extend WAL retention if some other replicas are dead
1 parent e12fb6f commit 67e26d6

12 files changed

Lines changed: 140 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ tonic-reflection = "0.11.0"
283283
tracing = { version = "0.1", features = ["async-await"] }
284284
uuid = { version = "1.19", features = ["v4", "serde"] }
285285
validator = { version = "0.20.0", features = ["derive"] }
286-
wal = { git = "https://github.com/qdrant/wal.git", rev = "b03a156b0ffdbe8109b2c3c409ce3be2ec6d229e" }
286+
wal = { git = "https://github.com/qdrant/wal.git", branch = "allow-set-retention" }
287287
zerocopy = { version = "0.8.31", features = ["derive"] }
288288
atomic_refcell = "0.1.13"
289289
byteorder = "1.5.0"

lib/collection/src/shards/forward_proxy_shard.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,14 @@ impl ForwardProxyShard {
395395
.estimate_cardinality(filter, hw_measurement_acc)
396396
.await
397397
}
398+
399+
pub async fn set_extended_wal_retention(&self) {
400+
self.wrapped_shard.set_extended_wal_retention().await;
401+
}
402+
403+
pub async fn set_normal_wal_retention(&self) {
404+
self.wrapped_shard.set_normal_wal_retention().await;
405+
}
398406
}
399407

400408
#[async_trait]

lib/collection/src/shards/local_shard/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod snapshot_tests;
1717
mod drop;
1818
#[cfg(feature = "testing")]
1919
pub mod testing;
20+
mod wal_ops;
2021

2122
use std::collections::{BTreeSet, HashMap};
2223
use std::ops::Deref;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use crate::shards::local_shard::LocalShard;
2+
3+
impl LocalShard {
4+
/// WAL is keeping more data, even if truncated.
5+
/// Useful if we expect to read old WAL records soon.
6+
pub async fn set_extended_wal_retention(&self) {
7+
self.wal.set_extended_retention().await;
8+
}
9+
10+
/// WAL is keeping normal amount of data after truncation.
11+
pub async fn set_normal_wal_retention(&self) {
12+
self.wal.set_normal_retention().await;
13+
}
14+
}

lib/collection/src/shards/proxy_shard.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ impl ProxyShard {
181181
.estimate_cardinality(filter, hw_measurement_acc)
182182
.await
183183
}
184+
185+
pub async fn set_extended_wal_retention(&self) {
186+
self.wrapped_shard.set_extended_wal_retention().await;
187+
}
188+
189+
pub async fn set_normal_wal_retention(&self) {
190+
self.wrapped_shard.set_normal_wal_retention().await;
191+
}
184192
}
185193

186194
#[async_trait]

lib/collection/src/shards/queue_proxy_shard.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,18 @@ impl QueueProxyShard {
269269
.estimate_cardinality(filter, hw_measurement_acc)
270270
.await
271271
}
272+
273+
pub async fn set_extended_wal_retention(&self) {
274+
if let Some(inner) = &self.inner {
275+
inner.wrapped_shard.set_extended_wal_retention().await;
276+
}
277+
}
278+
279+
pub async fn set_normal_wal_retention(&self) {
280+
if let Some(inner) = &self.inner {
281+
inner.wrapped_shard.set_normal_wal_retention().await;
282+
}
283+
}
272284
}
273285

274286
#[async_trait]

lib/collection/src/shards/replica_set/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,8 @@ impl ShardReplicaSet {
694694

695695
if self.this_peer_id() == peer_id {
696696
self.on_local_state_updated(state).await?;
697+
} else {
698+
self.on_remote_state_updated(peer_id, state).await;
697699
}
698700

699701
self.update_locally_disabled(peer_id);
@@ -764,6 +766,8 @@ impl ShardReplicaSet {
764766

765767
if self.this_peer_id() == peer_id {
766768
self.on_local_state_updated(state).await?;
769+
} else {
770+
self.on_remote_state_updated(peer_id, state).await;
767771
}
768772

769773
self.update_locally_disabled(peer_id);
@@ -783,11 +787,42 @@ impl ShardReplicaSet {
783787
} else {
784788
local_shard.take_newest_clocks_snapshot().await?;
785789
}
790+
// Reset WAL retention to normal whenever local shard changes state
791+
local_shard.set_normal_wal_retention().await;
786792
}
787793

788794
Ok(())
789795
}
790796

797+
/// Called when a peer state is changed (except local peer).
798+
///
799+
async fn on_remote_state_updated(&self, _peer_id: PeerId, _new_state: ReplicaState) {
800+
let mut is_any_remote_dead = false;
801+
let mut is_local_active = false;
802+
let this_peer_id = self.this_peer_id();
803+
804+
for (peer_id, peer_state) in self.replica_state.read().peers().iter() {
805+
if *peer_id == this_peer_id {
806+
if peer_state.is_active() {
807+
is_local_active = true;
808+
}
809+
} else if peer_state.requires_recovery() {
810+
is_any_remote_dead = true;
811+
}
812+
}
813+
814+
{
815+
let local_opt = self.local.read().await;
816+
if let Some(local_shard) = local_opt.as_ref() {
817+
if is_local_active && is_any_remote_dead {
818+
local_shard.set_extended_wal_retention().await;
819+
} else {
820+
local_shard.set_normal_wal_retention().await;
821+
}
822+
}
823+
}
824+
}
825+
791826
pub async fn remove_peer(&self, peer_id: PeerId) -> CollectionResult<()> {
792827
if self.this_peer_id() == peer_id {
793828
self.remove_local().await?;

lib/collection/src/shards/replica_set/replica_set_state.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,22 @@ impl ReplicaState {
207207
}
208208
}
209209

210+
/// Check if the replica state requires automatic recovery to be scheduled.
211+
pub fn requires_recovery(self) -> bool {
212+
match self {
213+
ReplicaState::Dead => true,
214+
ReplicaState::Active
215+
| ReplicaState::Partial
216+
| ReplicaState::Initializing
217+
| ReplicaState::Listener
218+
| ReplicaState::PartialSnapshot
219+
| ReplicaState::Recovery
220+
| ReplicaState::Resharding
221+
| ReplicaState::ReshardingScaleDown
222+
| ReplicaState::ActiveRead => false,
223+
}
224+
}
225+
210226
/// Check whether the replica state is partial or partial-like.
211227
///
212228
/// In other words: is the state related to shard transfers?

lib/collection/src/shards/shard.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,27 @@ impl Shard {
368368
}
369369
}
370370

371+
pub async fn set_extended_wal_retention(&self) {
372+
match self {
373+
Shard::Local(local) => local.set_extended_wal_retention().await,
374+
Shard::Proxy(proxy) => proxy.set_extended_wal_retention().await,
375+
Shard::ForwardProxy(forward_proxy) => forward_proxy.set_extended_wal_retention().await,
376+
Shard::QueueProxy(queue_proxy) => queue_proxy.set_extended_wal_retention().await,
377+
Shard::Dummy(_) => {} // Do nothing for dummy shard
378+
}
379+
}
380+
381+
/// WAL is keeping normal amount of data after truncation.
382+
pub async fn set_normal_wal_retention(&self) {
383+
match self {
384+
Shard::Local(local) => local.set_normal_wal_retention().await,
385+
Shard::Proxy(proxy) => proxy.set_normal_wal_retention().await,
386+
Shard::ForwardProxy(forward_proxy) => forward_proxy.set_normal_wal_retention().await,
387+
Shard::QueueProxy(queue_proxy) => queue_proxy.set_normal_wal_retention().await,
388+
Shard::Dummy(_) => {} // Do nothing for dummy shard
389+
}
390+
}
391+
371392
pub async fn estimate_cardinality(
372393
&self,
373394
filter: Option<&Filter>,

0 commit comments

Comments
 (0)