Skip to content

Commit 64c806d

Browse files
authored
[cluster telemetry] Internal service conversions (#7631)
* implement conversions * clippy * rename `ReshardStage` -> `ReshardingStage` * impl conversions to grpc * upd response in proto * convert TelemetryData * add peer info * upd openapi * add conversion from TelemetryData to PeerTelemetry * review nit * re-style * make `method` optional * change comments to reference `ReshardingStreamRecords` * missing nits
1 parent f988eb4 commit 64c806d

16 files changed

Lines changed: 717 additions & 103 deletions

File tree

docs/redoc/master/openapi.json

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11670,7 +11670,7 @@
1167011670
"minimum": 0
1167111671
},
1167211672
"to_shard_id": {
11673-
"description": "Target shard ID if different than source shard ID\n\nUsed exclusively with `ReshardStreamRecords` transfer method.",
11673+
"description": "Target shard ID if different than source shard ID\n\nUsed exclusively with `ReshardingStreamRecords` transfer method.",
1167411674
"type": "integer",
1167511675
"format": "uint32",
1167611676
"minimum": 0,
@@ -11763,7 +11763,6 @@
1176311763
"TelemetryData": {
1176411764
"type": "object",
1176511765
"required": [
11766-
"app",
1176711766
"collections",
1176811767
"id"
1176911768
],
@@ -11772,7 +11771,14 @@
1177211771
"type": "string"
1177311772
},
1177411773
"app": {
11775-
"$ref": "#/components/schemas/AppBuildTelemetry"
11774+
"anyOf": [
11775+
{
11776+
"$ref": "#/components/schemas/AppBuildTelemetry"
11777+
},
11778+
{
11779+
"nullable": true
11780+
}
11781+
]
1177611782
},
1177711783
"collections": {
1177811784
"$ref": "#/components/schemas/CollectionsTelemetry"
@@ -12105,9 +12111,7 @@
1210512111
"CollectionTelemetry": {
1210612112
"type": "object",
1210712113
"required": [
12108-
"config",
12109-
"id",
12110-
"init_time_ms"
12114+
"id"
1211112115
],
1211212116
"properties": {
1211312117
"id": {
@@ -12116,10 +12120,18 @@
1211612120
"init_time_ms": {
1211712121
"type": "integer",
1211812122
"format": "uint64",
12119-
"minimum": 0
12123+
"minimum": 0,
12124+
"nullable": true
1212012125
},
1212112126
"config": {
12122-
"$ref": "#/components/schemas/CollectionConfigTelemetry"
12127+
"anyOf": [
12128+
{
12129+
"$ref": "#/components/schemas/CollectionConfigTelemetry"
12130+
},
12131+
{
12132+
"nullable": true
12133+
}
12134+
]
1212312135
},
1212412136
"shards": {
1212512137
"type": "array",

lib/api/src/grpc/proto/telemetry_internal.proto

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@ message GetPeerTelemetryRequest {
1313
}
1414

1515
message GetPeerTelemetryResponse {
16-
// id
17-
reserved 1;
18-
// app
19-
reserved 2;
16+
PeerTelemetry result = 1;
17+
double time = 2;
18+
}
19+
20+
message PeerTelemetry {
21+
reserved 1; // id
22+
reserved 2; // app
2023
// Mapping from collection name to its telemetry
2124
map<string, CollectionTelemetry> collections = 3;
2225
// Telemetry about the cluster and peers
2326
ClusterTelemetry cluster = 4;
24-
// requests
25-
reserved 5;
26-
// memory
27-
reserved 6;
28-
// hardware
29-
reserved 7;
27+
reserved 5; // requests
28+
reserved 6; // memory
29+
reserved 7; // hardware
3030
}
3131

3232
message CollectionTelemetry {
@@ -50,7 +50,7 @@ message ShardTransferTelemetry {
5050
// Local shard id
5151
uint32 shard_id = 1;
5252
// Target shard ID if different than source shard ID.
53-
// Used exclusively with `ReshardStreamRecords` transfer method.
53+
// Used exclusively with `ReshardingStreamRecords` transfer method.
5454
optional uint32 to_shard_id = 2;
5555
// From peer id
5656
uint64 from = 3;
@@ -60,7 +60,7 @@ message ShardTransferTelemetry {
6060
// If `false` transfer is a moving of a shard from one peer to another.
6161
bool sync = 5;
6262
// Method of transferring points
63-
ShardTransferMethod method = 6;
63+
optional ShardTransferMethod method = 6;
6464
// Freeform string. Typically reports progress
6565
string comment = 7;
6666
}
@@ -112,24 +112,20 @@ message ShardCleanStatusTelemetry {
112112

113113
message ClusterTelemetry {
114114
ClusterStatusTelemetry status = 1;
115-
// config
116-
reserved 2;
117-
// peers
118-
reserved 3;
119-
// peer_metadata
120-
reserved 4;
121-
// metadata
122-
reserved 5;
115+
reserved 2; // config
116+
map<uint64, PeerInfo> peers = 3;
117+
reserved 4; // peer_metadata
118+
reserved 5; // metadata
123119
}
124120

125121
message ClusterStatusTelemetry {
126122
uint32 num_peers = 1;
127123
uint64 term = 2;
128124
uint64 commit = 3;
129125
uint64 pending_operations = 4;
130-
StateRole role = 5;
126+
optional StateRole role = 5;
131127
bool is_voter = 6;
132-
uint64 peer_id = 7;
128+
optional uint64 peer_id = 7;
133129
ConsensusThreadStatus consensus_thread_status = 8;
134130
}
135131

@@ -153,6 +149,10 @@ message ConsensusThreadStatus {
153149
}
154150
}
155151

152+
message PeerInfo {
153+
string uri = 1;
154+
}
155+
156156
enum StateRole {
157157
FOLLOWER = 0;
158158
CANDIDATE = 1;

lib/api/src/grpc/qdrant.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12461,6 +12461,15 @@ pub struct GetPeerTelemetryRequest {
1246112461
#[allow(clippy::derive_partial_eq_without_eq)]
1246212462
#[derive(Clone, PartialEq, ::prost::Message)]
1246312463
pub struct GetPeerTelemetryResponse {
12464+
#[prost(message, optional, tag = "1")]
12465+
pub result: ::core::option::Option<PeerTelemetry>,
12466+
#[prost(double, tag = "2")]
12467+
pub time: f64,
12468+
}
12469+
#[derive(serde::Serialize)]
12470+
#[allow(clippy::derive_partial_eq_without_eq)]
12471+
#[derive(Clone, PartialEq, ::prost::Message)]
12472+
pub struct PeerTelemetry {
1246412473
/// Mapping from collection name to its telemetry
1246512474
#[prost(map = "string, message", tag = "3")]
1246612475
pub collections: ::std::collections::HashMap<
@@ -12496,7 +12505,7 @@ pub struct ShardTransferTelemetry {
1249612505
#[prost(uint32, tag = "1")]
1249712506
pub shard_id: u32,
1249812507
/// Target shard ID if different than source shard ID.
12499-
/// Used exclusively with `ReshardStreamRecords` transfer method.
12508+
/// Used exclusively with `ReshardingStreamRecords` transfer method.
1250012509
#[prost(uint32, optional, tag = "2")]
1250112510
pub to_shard_id: ::core::option::Option<u32>,
1250212511
/// From peer id
@@ -12510,8 +12519,8 @@ pub struct ShardTransferTelemetry {
1251012519
#[prost(bool, tag = "5")]
1251112520
pub sync: bool,
1251212521
/// Method of transferring points
12513-
#[prost(enumeration = "ShardTransferMethod", tag = "6")]
12514-
pub method: i32,
12522+
#[prost(enumeration = "ShardTransferMethod", optional, tag = "6")]
12523+
pub method: ::core::option::Option<i32>,
1251512524
/// Freeform string. Typically reports progress
1251612525
#[prost(string, tag = "7")]
1251712526
pub comment: ::prost::alloc::string::String,
@@ -12593,6 +12602,8 @@ pub mod shard_clean_status_telemetry {
1259312602
pub struct ClusterTelemetry {
1259412603
#[prost(message, optional, tag = "1")]
1259512604
pub status: ::core::option::Option<ClusterStatusTelemetry>,
12605+
#[prost(map = "uint64, message", tag = "3")]
12606+
pub peers: ::std::collections::HashMap<u64, PeerInfo>,
1259612607
}
1259712608
#[derive(serde::Serialize)]
1259812609
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -12606,12 +12617,12 @@ pub struct ClusterStatusTelemetry {
1260612617
pub commit: u64,
1260712618
#[prost(uint64, tag = "4")]
1260812619
pub pending_operations: u64,
12609-
#[prost(enumeration = "StateRole", tag = "5")]
12610-
pub role: i32,
12620+
#[prost(enumeration = "StateRole", optional, tag = "5")]
12621+
pub role: ::core::option::Option<i32>,
1261112622
#[prost(bool, tag = "6")]
1261212623
pub is_voter: bool,
12613-
#[prost(uint64, tag = "7")]
12614-
pub peer_id: u64,
12624+
#[prost(uint64, optional, tag = "7")]
12625+
pub peer_id: ::core::option::Option<u64>,
1261512626
#[prost(message, optional, tag = "8")]
1261612627
pub consensus_thread_status: ::core::option::Option<ConsensusThreadStatus>,
1261712628
}
@@ -12657,6 +12668,13 @@ pub mod consensus_thread_status {
1265712668
}
1265812669
}
1265912670
#[derive(serde::Serialize)]
12671+
#[allow(clippy::derive_partial_eq_without_eq)]
12672+
#[derive(Clone, PartialEq, ::prost::Message)]
12673+
pub struct PeerInfo {
12674+
#[prost(string, tag = "1")]
12675+
pub uri: ::prost::alloc::string::String,
12676+
}
12677+
#[derive(serde::Serialize)]
1266012678
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1266112679
#[repr(i32)]
1266212680
pub enum ReshardingStage {

lib/collection/src/collection/telemetry.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ impl Collection {
3737

3838
Ok(CollectionTelemetry {
3939
id: self.name().to_string(),
40-
init_time_ms: self.init_time.as_millis() as u64,
41-
config: CollectionConfigTelemetry::from(self.collection_config.read().await.clone()),
40+
init_time_ms: Some(self.init_time.as_millis() as u64),
41+
config: Some(CollectionConfigTelemetry::from(
42+
self.collection_config.read().await.clone(),
43+
)),
4244
shards: shards_telemetry,
4345
transfers,
4446
resharding,

lib/collection/src/operations/conversions.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,7 @@ impl From<ReshardingInfo> for api::grpc::qdrant::ReshardingInfo {
14161416
shard_id,
14171417
peer_id,
14181418
shard_key,
1419+
stage: _, // only communicated for ReshardingTelemetry (internal service)
14191420
} = value;
14201421
Self {
14211422
shard_id,
@@ -1435,6 +1436,15 @@ impl From<ReshardingDirection> for api::grpc::qdrant::ReshardingDirection {
14351436
}
14361437
}
14371438

1439+
impl From<api::grpc::qdrant::ReshardingDirection> for ReshardingDirection {
1440+
fn from(value: api::grpc::qdrant::ReshardingDirection) -> Self {
1441+
match value {
1442+
api::grpc::qdrant::ReshardingDirection::Up => ReshardingDirection::Up,
1443+
api::grpc::qdrant::ReshardingDirection::Down => ReshardingDirection::Down,
1444+
}
1445+
}
1446+
}
1447+
14381448
impl From<ShardTransferInfo> for api::grpc::qdrant::ShardTransferInfo {
14391449
fn from(value: ShardTransferInfo) -> Self {
14401450
let ShardTransferInfo {
@@ -1577,6 +1587,21 @@ impl From<api::grpc::qdrant::ShardTransferMethod> for ShardTransferMethod {
15771587
}
15781588
}
15791589

1590+
impl From<ShardTransferMethod> for api::grpc::qdrant::ShardTransferMethod {
1591+
fn from(value: ShardTransferMethod) -> Self {
1592+
match value {
1593+
ShardTransferMethod::StreamRecords => {
1594+
api::grpc::qdrant::ShardTransferMethod::StreamRecords
1595+
}
1596+
ShardTransferMethod::Snapshot => api::grpc::qdrant::ShardTransferMethod::Snapshot,
1597+
ShardTransferMethod::WalDelta => api::grpc::qdrant::ShardTransferMethod::WalDelta,
1598+
ShardTransferMethod::ReshardingStreamRecords => {
1599+
api::grpc::qdrant::ShardTransferMethod::ReshardingStreamRecords
1600+
}
1601+
}
1602+
}
1603+
}
1604+
15801605
impl TryFrom<api::grpc::qdrant::CreateShardKey> for CreateShardingKey {
15811606
type Error = Status;
15821607

lib/collection/src/operations/types.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::operations::cluster_ops::ReshardingDirection;
5353
use crate::operations::config_diff::{HnswConfigDiff, QuantizationConfigDiff};
5454
use crate::optimizers_builder::OptimizersConfig;
5555
use crate::shards::replica_set::replica_set_state::ReplicaState;
56+
use crate::shards::resharding::ReshardingStage;
5657
use crate::shards::shard::{PeerId, ShardId};
5758
use crate::shards::transfer::ShardTransferMethod;
5859

@@ -302,7 +303,7 @@ pub struct ShardTransferInfo {
302303

303304
/// Target shard ID if different than source shard ID
304305
///
305-
/// Used exclusively with `ReshardStreamRecords` transfer method.
306+
/// Used exclusively with `ReshardingStreamRecords` transfer method.
306307
#[serde(default, skip_serializing_if = "Option::is_none")]
307308
#[anonymize(false)]
308309
pub to_shard_id: Option<ShardId>,
@@ -345,6 +346,11 @@ pub struct ReshardingInfo {
345346
pub peer_id: PeerId,
346347

347348
pub shard_key: Option<ShardKey>,
349+
350+
/// Only included in peer telemetry
351+
#[serde(skip)]
352+
#[anonymize(false)]
353+
pub stage: ReshardingStage,
348354
}
349355

350356
#[derive(Debug, Serialize, JsonSchema)]

lib/collection/src/shards/resharding.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct ReshardState {
1515
pub shard_id: ShardId,
1616
pub shard_key: Option<ShardKey>,
1717
pub direction: ReshardingDirection,
18-
pub stage: ReshardStage,
18+
pub stage: ReshardingStage,
1919
}
2020

2121
impl ReshardState {
@@ -32,7 +32,7 @@ impl ReshardState {
3232
peer_id,
3333
shard_id,
3434
shard_key,
35-
stage: ReshardStage::MigratingPoints,
35+
stage: ReshardingStage::MigratingPoints,
3636
}
3737
}
3838

@@ -55,14 +55,14 @@ impl ReshardState {
5555
}
5656
}
5757

58-
/// Reshard stages
58+
/// Resharding stages
5959
///
6060
/// # Warning
6161
///
6262
/// This enum is ordered!
6363
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
6464
#[serde(rename_all = "snake_case")]
65-
pub enum ReshardStage {
65+
pub enum ReshardingStage {
6666
#[default]
6767
MigratingPoints,
6868
ReadHashRingCommitted,

lib/collection/src/shards/shard_holder/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio_util::io::SyncIoBridge;
3030

3131
use super::replica_set::snapshots::RecoveryType;
3232
use super::replica_set::{AbortShardTransfer, ChangePeerFromState};
33-
use super::resharding::{ReshardStage, ReshardState};
33+
use super::resharding::{ReshardState, ReshardingStage};
3434
use super::transfer::transfer_tasks_pool::TransferTasksPool;
3535
use crate::collection::payload_index_schema::PayloadIndexSchema;
3636
use crate::common::collection_size_stats::CollectionSizeStats;
@@ -314,7 +314,7 @@ impl ShardHolder {
314314

315315
ring.start_resharding(state.shard_id, state.direction);
316316

317-
if state.stage >= ReshardStage::WriteHashRingCommitted {
317+
if state.stage >= ReshardingStage::WriteHashRingCommitted {
318318
ring.commit_resharding();
319319
}
320320
}
@@ -553,6 +553,7 @@ impl ShardHolder {
553553
peer_id: resharding_state.peer_id,
554554
direction: resharding_state.direction,
555555
shard_key: resharding_state.shard_key.clone(),
556+
stage: resharding_state.stage,
556557
});
557558

558559
resharding_operations.sort_by_key(|k| k.shard_id);
@@ -596,7 +597,7 @@ impl ShardHolder {
596597
self.resharding_state.read().clone().is_some_and(|state| {
597598
state.direction == ReshardingDirection::Up
598599
&& state.shard_id == shard_id
599-
&& state.stage < ReshardStage::ReadHashRingCommitted
600+
&& state.stage < ReshardingStage::ReadHashRingCommitted
600601
});
601602
if resharding_migrating_up {
602603
continue;

0 commit comments

Comments
 (0)