Skip to content

Commit 8ab86b6

Browse files
authored
[cluster telemetry] Internal telemetry endpoint (#7672)
* move QdrantInternalService to its own file * implement internal telemetry endpoint cleaner diff on src/main.rs upd response in proto * use try_from conversion * rename request/response in proto
1 parent 64c806d commit 8ab86b6

9 files changed

Lines changed: 168 additions & 118 deletions

File tree

lib/api/src/grpc/proto/qdrant_internal_service.proto

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ service QdrantInternal {
1414
rpc WaitOnConsensusCommit(WaitOnConsensusCommitRequest)
1515
returns (WaitOnConsensusCommitResponse) {}
1616

17-
// Get telemetry from a peer
18-
rpc GetPeerTelemetry(GetPeerTelemetryRequest)
19-
returns (GetPeerTelemetryResponse) {}
17+
// Get telemetry
18+
rpc GetTelemetry(GetTelemetryRequest) returns (GetTelemetryResponse) {}
2019
}
2120

2221
message GetConsensusCommitRequest {}

lib/api/src/grpc/proto/telemetry_internal.proto

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import "collections.proto";
55
package qdrant;
66
option csharp_namespace = "Qdrant.Client.Grpc";
77

8-
message GetPeerTelemetryRequest {
9-
// The peer id to ask for telemetry
10-
uint64 peer_id = 1;
8+
message GetTelemetryRequest {
119
// The level of detail needed
12-
uint32 details_level = 2;
10+
uint32 details_level = 1;
11+
// Timeout in secs for the request
12+
uint64 timeout = 2;
1313
}
1414

15-
message GetPeerTelemetryResponse {
15+
message GetTelemetryResponse {
1616
PeerTelemetry result = 1;
1717
double time = 2;
1818
}

lib/api/src/grpc/qdrant.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12449,18 +12449,18 @@ pub mod points_internal_server {
1244912449
#[derive(serde::Serialize)]
1245012450
#[allow(clippy::derive_partial_eq_without_eq)]
1245112451
#[derive(Clone, PartialEq, ::prost::Message)]
12452-
pub struct GetPeerTelemetryRequest {
12453-
/// The peer id to ask for telemetry
12454-
#[prost(uint64, tag = "1")]
12455-
pub peer_id: u64,
12452+
pub struct GetTelemetryRequest {
1245612453
/// The level of detail needed
12457-
#[prost(uint32, tag = "2")]
12454+
#[prost(uint32, tag = "1")]
1245812455
pub details_level: u32,
12456+
/// Timeout in secs for the request
12457+
#[prost(uint64, tag = "2")]
12458+
pub timeout: u64,
1245912459
}
1246012460
#[derive(serde::Serialize)]
1246112461
#[allow(clippy::derive_partial_eq_without_eq)]
1246212462
#[derive(Clone, PartialEq, ::prost::Message)]
12463-
pub struct GetPeerTelemetryResponse {
12463+
pub struct GetTelemetryResponse {
1246412464
#[prost(message, optional, tag = "1")]
1246512465
pub result: ::core::option::Option<PeerTelemetry>,
1246612466
#[prost(double, tag = "2")]
@@ -12913,12 +12913,12 @@ pub mod qdrant_internal_client {
1291312913
);
1291412914
self.inner.unary(req, path, codec).await
1291512915
}
12916-
/// Get telemetry from a peer
12917-
pub async fn get_peer_telemetry(
12916+
/// Get telemetry
12917+
pub async fn get_telemetry(
1291812918
&mut self,
12919-
request: impl tonic::IntoRequest<super::GetPeerTelemetryRequest>,
12919+
request: impl tonic::IntoRequest<super::GetTelemetryRequest>,
1292012920
) -> std::result::Result<
12921-
tonic::Response<super::GetPeerTelemetryResponse>,
12921+
tonic::Response<super::GetTelemetryResponse>,
1292212922
tonic::Status,
1292312923
> {
1292412924
self.inner
@@ -12932,11 +12932,11 @@ pub mod qdrant_internal_client {
1293212932
})?;
1293312933
let codec = tonic::codec::ProstCodec::default();
1293412934
let path = http::uri::PathAndQuery::from_static(
12935-
"/qdrant.QdrantInternal/GetPeerTelemetry",
12935+
"/qdrant.QdrantInternal/GetTelemetry",
1293612936
);
1293712937
let mut req = request.into_request();
1293812938
req.extensions_mut()
12939-
.insert(GrpcMethod::new("qdrant.QdrantInternal", "GetPeerTelemetry"));
12939+
.insert(GrpcMethod::new("qdrant.QdrantInternal", "GetTelemetry"));
1294012940
self.inner.unary(req, path, codec).await
1294112941
}
1294212942
}
@@ -12964,12 +12964,12 @@ pub mod qdrant_internal_server {
1296412964
tonic::Response<super::WaitOnConsensusCommitResponse>,
1296512965
tonic::Status,
1296612966
>;
12967-
/// Get telemetry from a peer
12968-
async fn get_peer_telemetry(
12967+
/// Get telemetry
12968+
async fn get_telemetry(
1296912969
&self,
12970-
request: tonic::Request<super::GetPeerTelemetryRequest>,
12970+
request: tonic::Request<super::GetTelemetryRequest>,
1297112971
) -> std::result::Result<
12972-
tonic::Response<super::GetPeerTelemetryResponse>,
12972+
tonic::Response<super::GetTelemetryResponse>,
1297312973
tonic::Status,
1297412974
>;
1297512975
}
@@ -13149,26 +13149,25 @@ pub mod qdrant_internal_server {
1314913149
};
1315013150
Box::pin(fut)
1315113151
}
13152-
"/qdrant.QdrantInternal/GetPeerTelemetry" => {
13152+
"/qdrant.QdrantInternal/GetTelemetry" => {
1315313153
#[allow(non_camel_case_types)]
13154-
struct GetPeerTelemetrySvc<T: QdrantInternal>(pub Arc<T>);
13154+
struct GetTelemetrySvc<T: QdrantInternal>(pub Arc<T>);
1315513155
impl<
1315613156
T: QdrantInternal,
13157-
> tonic::server::UnaryService<super::GetPeerTelemetryRequest>
13158-
for GetPeerTelemetrySvc<T> {
13159-
type Response = super::GetPeerTelemetryResponse;
13157+
> tonic::server::UnaryService<super::GetTelemetryRequest>
13158+
for GetTelemetrySvc<T> {
13159+
type Response = super::GetTelemetryResponse;
1316013160
type Future = BoxFuture<
1316113161
tonic::Response<Self::Response>,
1316213162
tonic::Status,
1316313163
>;
1316413164
fn call(
1316513165
&mut self,
13166-
request: tonic::Request<super::GetPeerTelemetryRequest>,
13166+
request: tonic::Request<super::GetTelemetryRequest>,
1316713167
) -> Self::Future {
1316813168
let inner = Arc::clone(&self.0);
1316913169
let fut = async move {
13170-
<T as QdrantInternal>::get_peer_telemetry(&inner, request)
13171-
.await
13170+
<T as QdrantInternal>::get_telemetry(&inner, request).await
1317213171
};
1317313172
Box::pin(fut)
1317413173
}
@@ -13180,7 +13179,7 @@ pub mod qdrant_internal_server {
1318013179
let inner = self.inner.clone();
1318113180
let fut = async move {
1318213181
let inner = inner.0;
13183-
let method = GetPeerTelemetrySvc(inner);
13182+
let method = GetTelemetrySvc(inner);
1318413183
let codec = tonic::codec::ProstCodec::default();
1318513184
let mut grpc = tonic::server::Grpc::new(codec)
1318613185
.apply_compression_config(

src/common/telemetry.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ pub struct TelemetryData {
6262
}
6363

6464
impl TelemetryCollector {
65-
pub fn reporting_id(&self) -> String {
66-
self.process_id.to_string()
67-
}
68-
6965
pub fn generate_id() -> Uuid {
7066
Uuid::new_v4()
7167
}

src/consensus.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use tokio::time::sleep;
2727
use tonic::transport::{ClientTlsConfig, Uri};
2828

2929
use crate::common::helpers;
30+
use crate::common::telemetry::TelemetryCollector;
3031
use crate::common::telemetry_ops::requests_telemetry::TonicTelemetryCollector;
3132
use crate::settings::{ConsensusConfig, Settings};
3233
use crate::tonic::init_internal;
@@ -68,7 +69,8 @@ impl Consensus {
6869
settings: Settings,
6970
channel_service: ChannelService,
7071
propose_receiver: mpsc::Receiver<ConsensusOperations>,
71-
telemetry_collector: Arc<parking_lot::Mutex<TonicTelemetryCollector>>,
72+
telemetry_collector: Arc<tokio::sync::Mutex<TelemetryCollector>>,
73+
tonic_telemetry_collector: Arc<parking_lot::Mutex<TonicTelemetryCollector>>,
7274
toc: Arc<TableOfContent>,
7375
runtime: Handle,
7476
reinit: bool,
@@ -156,6 +158,7 @@ impl Consensus {
156158
toc,
157159
state_ref,
158160
telemetry_collector,
161+
tonic_telemetry_collector,
159162
settings,
160163
p2p_host,
161164
p2p_port,

src/main.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ fn main() -> anyhow::Result<()> {
401401
// It decides if query should go directly to the ToC or through the consensus.
402402
let mut dispatcher = Dispatcher::new(toc_arc.clone());
403403

404-
let (telemetry_collector, dispatcher_arc, health_checker) = if is_distributed_deployment {
404+
let (telemetry_collector, tonic_telemetry_collector, dispatcher_arc, health_checker);
405+
if is_distributed_deployment {
405406
let consensus_state: ConsensusStateRef = ConsensusManager::new(
406407
persistent_consensus_state,
407408
toc_arc.clone(),
@@ -418,26 +419,28 @@ fn main() -> anyhow::Result<()> {
418419
let toc_dispatcher = TocDispatcher::new(Arc::downgrade(&toc_arc), consensus_state.clone());
419420
toc_arc.with_toc_dispatcher(toc_dispatcher);
420421

421-
let dispatcher_arc = Arc::new(dispatcher);
422+
dispatcher_arc = Arc::new(dispatcher);
422423

423424
// Monitoring and telemetry.
424-
let telemetry_collector =
425+
let telemetry =
425426
TelemetryCollector::new(settings.clone(), dispatcher_arc.clone(), reporting_id);
426-
let tonic_telemetry_collector = telemetry_collector.tonic_telemetry_collector.clone();
427+
tonic_telemetry_collector = telemetry.tonic_telemetry_collector.clone();
428+
429+
telemetry_collector = Arc::new(tokio::sync::Mutex::new(telemetry));
427430

428431
// `raft` crate uses `slog` crate so it is needed to use `slog_stdlog::StdLog` to forward
429432
// logs from it to `log` crate
430433
let slog_logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), slog::o!());
431434

432435
// Runs raft consensus in a separate thread.
433436
// Create a pipe `message_sender` to communicate with the consensus
434-
let health_checker = Arc::new(common::health::HealthChecker::spawn(
437+
health_checker = Some(Arc::new(common::health::HealthChecker::spawn(
435438
toc_arc.clone(),
436439
consensus_state.clone(),
437440
&runtime_handle,
438441
// NOTE: `wait_for_bootstrap` should be calculated *before* starting `Consensus` thread
439442
consensus_state.is_new_deployment() && bootstrap.is_some(),
440-
));
443+
)));
441444

442445
let handle = Consensus::run(
443446
&slog_logger,
@@ -447,7 +450,8 @@ fn main() -> anyhow::Result<()> {
447450
settings.clone(),
448451
channel_service,
449452
propose_receiver,
450-
tonic_telemetry_collector,
453+
telemetry_collector.clone(),
454+
tonic_telemetry_collector.clone(),
451455
toc_arc.clone(),
452456
runtime_handle.clone(),
453457
args.reinit,
@@ -499,27 +503,23 @@ fn main() -> anyhow::Result<()> {
499503
collections_to_recover_in_consensus,
500504
));
501505
}
502-
503-
(telemetry_collector, dispatcher_arc, Some(health_checker))
504506
} else {
505507
log::info!("Distributed mode disabled");
506-
let dispatcher_arc = Arc::new(dispatcher);
508+
dispatcher_arc = Arc::new(dispatcher);
507509

508510
// Monitoring and telemetry.
509-
let telemetry_collector =
511+
let telemetry =
510512
TelemetryCollector::new(settings.clone(), dispatcher_arc.clone(), reporting_id);
511-
(telemetry_collector, dispatcher_arc, None)
512-
};
513513

514-
let tonic_telemetry_collector = telemetry_collector.tonic_telemetry_collector.clone();
514+
tonic_telemetry_collector = telemetry.tonic_telemetry_collector.clone();
515+
telemetry_collector = Arc::new(tokio::sync::Mutex::new(telemetry));
516+
health_checker = None;
517+
};
515518

516519
//
517520
// Telemetry reporting
518521
//
519522

520-
let reporting_id = telemetry_collector.reporting_id();
521-
let telemetry_collector = Arc::new(tokio::sync::Mutex::new(telemetry_collector));
522-
523523
if reporting_enabled {
524524
log::info!("Telemetry reporting enabled, id: {reporting_id}");
525525

src/tonic/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod collections_api;
22
pub mod collections_internal_api;
33
pub mod points_api;
44
pub mod points_internal_api;
5+
pub mod qdrant_internal_api;
56
pub mod raft_api;
67
pub mod snapshots_api;
78

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::sync::Arc;
2+
use std::time::{Duration, Instant};
3+
4+
use api::grpc::qdrant_internal_server::QdrantInternal;
5+
use api::grpc::{
6+
GetConsensusCommitRequest, GetConsensusCommitResponse, GetTelemetryRequest,
7+
GetTelemetryResponse, PeerTelemetry, WaitOnConsensusCommitRequest,
8+
WaitOnConsensusCommitResponse,
9+
};
10+
use common::types::{DetailsLevel, TelemetryDetail};
11+
use storage::content_manager::consensus_manager::ConsensusStateRef;
12+
use storage::rbac::Access;
13+
use tokio::sync::Mutex;
14+
use tonic::{Request, Response, Status};
15+
16+
use crate::common::telemetry::TelemetryCollector;
17+
use crate::settings::Settings;
18+
19+
pub struct QdrantInternalService {
20+
/// Telemetry collector
21+
telemetry_collector: Arc<Mutex<TelemetryCollector>>,
22+
/// Qdrant settings
23+
settings: Settings,
24+
/// Consensus state
25+
consensus_state: ConsensusStateRef,
26+
}
27+
28+
impl QdrantInternalService {
29+
pub fn new(
30+
telemetry_collector: Arc<Mutex<TelemetryCollector>>,
31+
settings: Settings,
32+
consensus_state: ConsensusStateRef,
33+
) -> Self {
34+
Self {
35+
telemetry_collector,
36+
settings,
37+
consensus_state,
38+
}
39+
}
40+
}
41+
42+
#[tonic::async_trait]
43+
impl QdrantInternal for QdrantInternalService {
44+
async fn get_consensus_commit(
45+
&self,
46+
_: tonic::Request<GetConsensusCommitRequest>,
47+
) -> Result<Response<GetConsensusCommitResponse>, Status> {
48+
let persistent = self.consensus_state.persistent.read();
49+
let commit = persistent.state.hard_state.commit as _;
50+
let term = persistent.state.hard_state.term as _;
51+
Ok(Response::new(GetConsensusCommitResponse { commit, term }))
52+
}
53+
54+
async fn wait_on_consensus_commit(
55+
&self,
56+
request: Request<WaitOnConsensusCommitRequest>,
57+
) -> Result<Response<WaitOnConsensusCommitResponse>, Status> {
58+
let request = request.into_inner();
59+
let commit = request.commit as u64;
60+
let term = request.term as u64;
61+
let timeout = Duration::from_secs(request.timeout as u64);
62+
let consensus_tick = Duration::from_millis(self.settings.cluster.consensus.tick_period_ms);
63+
let ok = self
64+
.consensus_state
65+
.wait_for_consensus_commit(commit, term, consensus_tick, timeout)
66+
.await
67+
.is_ok();
68+
Ok(Response::new(WaitOnConsensusCommitResponse { ok }))
69+
}
70+
71+
async fn get_telemetry(
72+
&self,
73+
request: Request<GetTelemetryRequest>,
74+
) -> Result<Response<GetTelemetryResponse>, Status> {
75+
let request = request.into_inner();
76+
77+
if request.details_level < 2 {
78+
return Err(Status::invalid_argument(
79+
"details_level for internal service must be >= 2",
80+
));
81+
}
82+
83+
let details_level = DetailsLevel::from(request.details_level.max(2) as usize);
84+
85+
let detail = TelemetryDetail {
86+
level: details_level,
87+
histograms: false,
88+
};
89+
90+
let timing = Instant::now();
91+
let timeout = Duration::from_secs(request.timeout);
92+
93+
let access = Access::full("internal service");
94+
95+
let telemetry_collector = self.telemetry_collector.lock().await;
96+
let telemetry_data = telemetry_collector
97+
.prepare_data(&access, detail, Some(timeout))
98+
.await?;
99+
100+
let response = GetTelemetryResponse {
101+
result: Some(PeerTelemetry::try_from(telemetry_data)?),
102+
time: timing.elapsed().as_secs_f64(),
103+
};
104+
105+
Ok(Response::new(response))
106+
}
107+
}

0 commit comments

Comments
 (0)