Skip to content

Commit 28f796b

Browse files
authored
fix(dogstatsd): accept pre-built client (#83)
* dogstatsd accepts a pre-built client client building is not a concern of a dogstatsd server, therefore, we should provide a simple reqwest client which can be modified upstream * update license * remove fips on dogstatsd
1 parent 690efd1 commit 28f796b

File tree

8 files changed

+64
-132
lines changed

8 files changed

+64
-132
lines changed

Cargo.lock

Lines changed: 2 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

LICENSE-3rdparty.csv

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ rustc-hash,https://github.com/rust-lang/rustc-hash,Apache-2.0 OR MIT,The Rust Pr
168168
rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman <[email protected]>, Jakub Konka <[email protected]>"
169169
rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors
170170
rustls-native-certs,https://github.com/rustls/rustls-native-certs,Apache-2.0 OR ISC OR MIT,The rustls-native-certs Authors
171-
rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,The rustls-pemfile Authors
172171
rustls-pki-types,https://github.com/rustls/pki-types,MIT OR Apache-2.0,The rustls-pki-types Authors
173172
rustls-webpki,https://github.com/rustls/webpki,ISC,The rustls-webpki Authors
174173
rusty-fork,https://github.com/altsysrq/rusty-fork,MIT OR Apache-2.0,Jason Lingle

crates/datadog-serverless-compat/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"]
1212
[dependencies]
1313
datadog-trace-agent = { path = "../datadog-trace-agent" }
1414
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" }
15+
datadog-fips = { path = "../datadog-fips", default-features = false }
1516
dogstatsd = { path = "../dogstatsd", default-features = true }
17+
reqwest = { version = "0.12.4", default-features = false }
1618
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
1719
tokio-util = { version = "0.7", default-features = false }
1820
tracing = { version = "0.1", default-features = false }

crates/datadog-serverless-compat/src/main.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datadog_trace_agent::{
2525

2626
use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType};
2727

28+
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
2829
use dogstatsd::{
2930
aggregator::{AggregatorHandle, AggregatorService},
3031
api_key::ApiKeyFactory,
@@ -262,21 +263,33 @@ async fn start_dogstatsd(
262263

263264
let metrics_flusher = match dd_api_key {
264265
Some(dd_api_key) => {
265-
#[allow(clippy::expect_used)]
266+
let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) {
267+
Ok(client) => client,
268+
Err(e) => {
269+
error!("Failed to build HTTP client: {e}, won't flush metrics");
270+
return (dogstatsd_cancel_token, None, handle);
271+
}
272+
};
273+
274+
let metrics_intake_url_prefix = match Site::new(dd_site)
275+
.map_err(|e| e.to_string())
276+
.and_then(|site| {
277+
MetricsIntakeUrlPrefix::new(Some(site), None).map_err(|e| e.to_string())
278+
}) {
279+
Ok(prefix) => prefix,
280+
Err(e) => {
281+
error!("Failed to create metrics intake URL: {e}, won't flush metrics");
282+
return (dogstatsd_cancel_token, None, handle);
283+
}
284+
};
285+
266286
let metrics_flusher = Flusher::new(FlusherConfig {
267287
api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)),
268288
aggregator_handle: handle.clone(),
269-
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
270-
Some(Site::new(dd_site).expect("Failed to parse site")),
271-
None,
272-
)
273-
.expect("Failed to create intake URL prefix"),
274-
https_proxy,
275-
timeout: DOGSTATSD_TIMEOUT_DURATION,
289+
metrics_intake_url_prefix,
290+
client,
276291
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
277292
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
278-
// Not supported yet
279-
ca_cert_path: None,
280293
});
281294
Some(metrics_flusher)
282295
}
@@ -288,3 +301,14 @@ async fn start_dogstatsd(
288301

289302
(dogstatsd_cancel_token, metrics_flusher, handle)
290303
}
304+
305+
fn build_metrics_client(
306+
https_proxy: Option<String>,
307+
timeout: Duration,
308+
) -> Result<reqwest::Client, Box<dyn std::error::Error>> {
309+
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
310+
if let Some(proxy) = https_proxy {
311+
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
312+
}
313+
Ok(builder.build()?)
314+
}

crates/dogstatsd/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ tokio-util = { version = "0.7.11", default-features = false }
2525
tracing = { version = "0.1.40", default-features = false }
2626
regex = { version = "1.10.6", default-features = false }
2727
zstd = { version = "0.13.3", default-features = false }
28-
datadog-fips = { path = "../datadog-fips", default-features = false }
29-
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }
3028

3129
[dev-dependencies]
3230
http = "1"
@@ -35,6 +33,5 @@ proptest = "1.4.0"
3533
tracing-test = { version = "0.2.5", default-features = false }
3634

3735
[features]
38-
default = [ "reqwest/rustls-tls" ]
39-
fips = [ "reqwest/rustls-tls-no-provider", "datadog-fips/fips" ]
36+
default = []
4037
windows-pipes = ["tokio/io-util"]

crates/dogstatsd/src/datadog/shipping.rs

Lines changed: 5 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,13 @@
33

44
use super::{MetricsIntakeUrlPrefix, Series};
55
use crate::flusher::ShippingError;
6-
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
76
use datadog_protos::metrics::SketchPayload;
87
use protobuf::Message;
98
use reqwest::{Client, Response};
109
use serde_json;
11-
use std::error::Error;
12-
use std::fs::File;
13-
use std::io::{BufReader, Write};
10+
use std::io::Write;
1411
use std::time::Duration;
15-
use tracing::{debug, error, trace};
12+
use tracing::{debug, trace};
1613
use zstd::stream::write::Encoder;
1714
use zstd::zstd_safe::CompressionLevel;
1815

@@ -21,7 +18,7 @@ use zstd::zstd_safe::CompressionLevel;
2118
pub struct DdApi {
2219
api_key: String,
2320
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
24-
client: Option<Client>,
21+
client: Client,
2522
retry_strategy: RetryStrategy,
2623
compression_level: CompressionLevel,
2724
}
@@ -31,17 +28,10 @@ impl DdApi {
3128
pub fn new(
3229
api_key: String,
3330
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
34-
https_proxy: Option<String>,
35-
ca_cert_path: Option<String>,
36-
timeout: Duration,
31+
client: Client,
3732
retry_strategy: RetryStrategy,
3833
compression_level: CompressionLevel,
3934
) -> Self {
40-
let client = build_client(https_proxy, ca_cert_path, timeout)
41-
.inspect_err(|e| {
42-
error!("Unable to create client {:?}", e);
43-
})
44-
.ok();
4535
DdApi {
4636
api_key,
4737
metrics_intake_url_prefix,
@@ -88,10 +78,7 @@ impl DdApi {
8878
body: Vec<u8>,
8979
content_type: &str,
9080
) -> Result<Response, ShippingError> {
91-
let client = &self
92-
.client
93-
.as_ref()
94-
.ok_or_else(|| ShippingError::Destination(None, "No client".to_string()))?;
81+
let client = &self.client;
9582
let start = std::time::Instant::now();
9683

9784
let result = (|| -> std::io::Result<Vec<u8>> {
@@ -174,56 +161,3 @@ pub enum RetryStrategy {
174161
Immediate(u64), // attempts
175162
LinearBackoff(u64, u64), // attempts, delay
176163
}
177-
178-
fn build_client(
179-
https_proxy: Option<String>,
180-
ca_cert_path: Option<String>,
181-
timeout: Duration,
182-
) -> Result<Client, Box<dyn Error>> {
183-
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
184-
185-
// Load custom TLS certificate if configured
186-
if let Some(cert_path) = &ca_cert_path {
187-
match load_custom_cert(cert_path) {
188-
Ok(certs) => {
189-
let cert_count = certs.len();
190-
for cert in certs {
191-
builder = builder.add_root_certificate(cert);
192-
}
193-
debug!(
194-
"DOGSTATSD | Added {} root certificate(s) from {}",
195-
cert_count, cert_path
196-
);
197-
}
198-
Err(e) => {
199-
error!(
200-
"DOGSTATSD | Failed to load TLS certificate from {}: {}, continuing without custom cert",
201-
cert_path, e
202-
);
203-
}
204-
}
205-
}
206-
207-
if let Some(proxy) = https_proxy {
208-
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
209-
}
210-
Ok(builder.build()?)
211-
}
212-
213-
fn load_custom_cert(cert_path: &str) -> Result<Vec<reqwest::Certificate>, Box<dyn Error>> {
214-
let file = File::open(cert_path)?;
215-
let mut reader = BufReader::new(file);
216-
217-
// Parse PEM certificates
218-
let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
219-
220-
if certs.is_empty() {
221-
return Err("No certificates found in file".into());
222-
}
223-
224-
// Convert all certificates found in the file
225-
certs
226-
.into_iter()
227-
.map(|cert| reqwest::Certificate::from_der(cert.as_ref()).map_err(Into::into))
228-
.collect()
229-
}

crates/dogstatsd/src/flusher.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
use crate::aggregator::AggregatorHandle;
55
use crate::api_key::ApiKeyFactory;
66
use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy};
7-
use reqwest::{Response, StatusCode};
7+
use reqwest::{Client, Response, StatusCode};
88
use std::sync::Arc;
9-
use std::time::Duration;
109
use tokio::sync::OnceCell;
1110
use tracing::{debug, error};
1211
use zstd::zstd_safe::CompressionLevel;
@@ -16,9 +15,7 @@ pub struct Flusher {
1615
// Allow accepting a future so the API key resolution is deferred until the flush happens
1716
api_key_factory: Arc<ApiKeyFactory>,
1817
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
19-
https_proxy: Option<String>,
20-
ca_cert_path: Option<String>,
21-
timeout: Duration,
18+
client: Client,
2219
retry_strategy: RetryStrategy,
2320
aggregator_handle: AggregatorHandle,
2421
dd_api: OnceCell<Option<DdApi>>,
@@ -29,9 +26,7 @@ pub struct FlusherConfig {
2926
pub api_key_factory: Arc<ApiKeyFactory>,
3027
pub aggregator_handle: AggregatorHandle,
3128
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
32-
pub https_proxy: Option<String>,
33-
pub ca_cert_path: Option<String>,
34-
pub timeout: Duration,
29+
pub client: Client,
3530
pub retry_strategy: RetryStrategy,
3631
pub compression_level: CompressionLevel,
3732
}
@@ -41,9 +36,7 @@ impl Flusher {
4136
Flusher {
4237
api_key_factory: Arc::clone(&config.api_key_factory),
4338
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
44-
https_proxy: config.https_proxy,
45-
ca_cert_path: config.ca_cert_path,
46-
timeout: config.timeout,
39+
client: config.client,
4740
retry_strategy: config.retry_strategy,
4841
aggregator_handle: config.aggregator_handle,
4942
compression_level: config.compression_level,
@@ -59,9 +52,7 @@ impl Flusher {
5952
Some(api_key) => Some(DdApi::new(
6053
api_key.to_string(),
6154
self.metrics_intake_url_prefix.clone(),
62-
self.https_proxy.clone(),
63-
self.ca_cert_path.clone(),
64-
self.timeout,
55+
self.client.clone(),
6556
self.retry_strategy.clone(),
6657
self.compression_level,
6758
)),
@@ -286,9 +277,7 @@ mod tests {
286277
),
287278
)
288279
.expect("failed to create URL"),
289-
https_proxy: None,
290-
ca_cert_path: None,
291-
timeout: Duration::from_secs(5),
280+
client: Client::builder().build().expect("failed to build client"),
292281
retry_strategy: RetryStrategy::Immediate(1),
293282
compression_level: CompressionLevel::try_from(6)
294283
.expect("failed to create compression level"),
@@ -333,9 +322,7 @@ mod tests {
333322
),
334323
)
335324
.expect("failed to create URL"),
336-
https_proxy: None,
337-
ca_cert_path: None,
338-
timeout: Duration::from_secs(5),
325+
client: Client::builder().build().expect("failed to build client"),
339326
retry_strategy: RetryStrategy::Immediate(1),
340327
compression_level: CompressionLevel::try_from(6)
341328
.expect("failed to create compression level"),
@@ -383,9 +370,7 @@ mod tests {
383370
),
384371
)
385372
.expect("failed to create URL"),
386-
https_proxy: None,
387-
ca_cert_path: None,
388-
timeout: Duration::from_secs(5),
373+
client: Client::builder().build().expect("failed to build client"),
389374
retry_strategy: RetryStrategy::Immediate(1),
390375
compression_level: CompressionLevel::try_from(6)
391376
.expect("failed to create compression level"),

0 commit comments

Comments
 (0)