1+ // Copyright The OpenTelemetry Authors
2+ // SPDX-License-Identifier: Apache-2.0
3+
4+ #ifndef ENABLE_METRICS_PREVIEW
5+ # include " opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
6+ # include " opentelemetry/sdk/common/global_log_handler.h"
7+ # include " opentelemetry/sdk/metrics/metric_exporter.h"
8+
9+ # include < chrono>
10+ # include < future>
11+
12+ OPENTELEMETRY_BEGIN_NAMESPACE
13+ namespace sdk
14+ {
15+ namespace metrics
16+ {
17+
18+ PeriodicExportingMetricReader::PeriodicExportingMetricReader (
19+ std::unique_ptr<MetricExporter> exporter,
20+ const PeriodicExportingMetricReaderOptions &option,
21+ AggregationTemporality aggregation_temporality)
22+ : MetricReader(aggregation_temporality),
23+ exporter_{std::move (exporter)},
24+ export_interval_millis_{option.export_interval_millis },
25+ export_timeout_millis_{option.export_timeout_millis }
26+ {
27+ if (export_interval_millis_ <= export_timeout_millis_)
28+ {
29+ OTEL_INTERNAL_LOG_WARN (
30+ " [Periodic Exporting Metric Reader] Invalid configuration: "
31+ " export_interval_millis_ should be less than export_timeout_millis_, using default values" );
32+ export_interval_millis_ = kExportIntervalMillis ;
33+ export_timeout_millis_ = kExportTimeOutMillis ;
34+ }
35+ }
36+
37+ void PeriodicExportingMetricReader::OnInitialized () noexcept
38+ {
39+ worker_thread_ = std::thread (&PeriodicExportingMetricReader::DoBackgroundWork, this );
40+ }
41+
42+ void PeriodicExportingMetricReader::DoBackgroundWork ()
43+ {
44+ std::unique_lock<std::mutex> lk (cv_m_);
45+ do
46+ {
47+ if (IsShutdown ())
48+ {
49+ break ;
50+ }
51+ std::atomic<bool > cancel_export_for_timeout{false };
52+ auto start = std::chrono::steady_clock::now ();
53+ auto future_receive = std::async (std::launch::async, [this , &cancel_export_for_timeout] {
54+ Collect ([this , &cancel_export_for_timeout](MetricData data) {
55+ if (cancel_export_for_timeout)
56+ {
57+ OTEL_INTERNAL_LOG_ERROR (
58+ " [Periodic Exporting Metric Reader] Collect took longer configured time: "
59+ << export_timeout_millis_.count () << " ms, and timed out" );
60+ return false ;
61+ }
62+ this ->exporter_ ->Export (data);
63+ return true ;
64+ });
65+ });
66+ std::future_status status;
67+ do
68+ {
69+ status = future_receive.wait_for (std::chrono::milliseconds (export_timeout_millis_));
70+ if (status == std::future_status::timeout)
71+ {
72+ cancel_export_for_timeout = true ;
73+ break ;
74+ }
75+ } while (status != std::future_status::ready);
76+ auto end = std::chrono::steady_clock::now ();
77+ auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
78+ auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
79+ cv_.wait_for (lk, remaining_wait_interval_ms);
80+ } while (true );
81+ }
82+
83+ bool PeriodicExportingMetricReader::OnForceFlush (std::chrono::microseconds timeout) noexcept
84+ {
85+ return exporter_->ForceFlush (timeout);
86+ }
87+
88+ bool PeriodicExportingMetricReader::OnShutDown (std::chrono::microseconds timeout) noexcept
89+ {
90+ if (worker_thread_.joinable ())
91+ {
92+ cv_.notify_one ();
93+ worker_thread_.join ();
94+ }
95+ return exporter_->Shutdown (timeout);
96+ }
97+
98+ } // namespace metrics
99+ } // namespace sdk
100+ OPENTELEMETRY_END_NAMESPACE
101+ #endif
0 commit comments