Skip to content

Commit 33d9c62

Browse files
authored
Implement periodic exporting metric reader (#1286)
1 parent 2034c9b commit 33d9c62

File tree

10 files changed

+272
-15
lines changed

10 files changed

+272
-15
lines changed

exporters/ostream/BUILD

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ cc_library(
4343
],
4444
)
4545

46-
#TODO MetricData is still changing, uncomment once it is final
46+
# TODO - Uncomment once MetricData interface is finalised
47+
#cc_library(
48+
# name = "ostream_metric_exporter",
4749
# srcs = [
4850
# "src/metric_exporter.cc",
4951
# ],
@@ -70,7 +72,7 @@ cc_library(
7072
# deps = [
7173
# ":ostream_metric_exporter",
7274
# "@com_google_googletest//:gtest_main",
73-
#],
75+
# ],
7476
#)
7577

7678
cc_test(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#pragma once
5+
#ifndef ENABLE_METRICS_PREVIEW
6+
7+
# include "opentelemetry/sdk/metrics/metric_reader.h"
8+
# include "opentelemetry/version.h"
9+
10+
# include <atomic>
11+
# include <chrono>
12+
# include <condition_variable>
13+
# include <thread>
14+
15+
OPENTELEMETRY_BEGIN_NAMESPACE
16+
namespace sdk
17+
{
18+
namespace metrics
19+
{
20+
21+
class MetricExporter;
22+
/**
23+
* Struct to hold PeriodicExortingMetricReader options.
24+
*/
25+
26+
constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000);
27+
constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000);
28+
struct PeriodicExportingMetricReaderOptions
29+
{
30+
31+
/* The time interval between two consecutive exports. */
32+
std::chrono::milliseconds export_interval_millis =
33+
std::chrono::milliseconds(kExportIntervalMillis);
34+
35+
/* how long the export can run before it is cancelled. */
36+
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis);
37+
};
38+
39+
class PeriodicExportingMetricReader : public MetricReader
40+
{
41+
42+
public:
43+
PeriodicExportingMetricReader(
44+
std::unique_ptr<MetricExporter> exporter,
45+
const PeriodicExportingMetricReaderOptions &option,
46+
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);
47+
48+
private:
49+
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;
50+
51+
bool OnShutDown(std::chrono::microseconds timeout) noexcept override;
52+
53+
void OnInitialized() noexcept override;
54+
55+
std::unique_ptr<MetricExporter> exporter_;
56+
std::chrono::milliseconds export_interval_millis_;
57+
std::chrono::milliseconds export_timeout_millis_;
58+
59+
void DoBackgroundWork();
60+
61+
/* The background worker thread */
62+
std::thread worker_thread_;
63+
64+
/* Synchronization primitives */
65+
std::condition_variable cv_;
66+
std::mutex cv_m_;
67+
};
68+
69+
} // namespace metrics
70+
} // namespace sdk
71+
OPENTELEMETRY_END_NAMESPACE
72+
#endif

sdk/include/opentelemetry/sdk/metrics/metric_exporter.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ class MetricExporter
3030
/**
3131
* Exports a batch of metrics recordables. This method must not be called
3232
* concurrently for the same exporter instance.
33-
* @param spans a span of unique pointers to metrics data
33+
* @param data metrics data
3434
*/
35-
virtual opentelemetry::sdk::common::ExportResult Export(
36-
const nostd::span<std::unique_ptr<opentelemetry::sdk::metrics::MetricData>>
37-
&records) noexcept = 0;
35+
virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &data) noexcept = 0;
3836

3937
/**
4038
* Force flush the exporter.
@@ -49,9 +47,6 @@ class MetricExporter
4947
*/
5048
virtual bool Shutdown(
5149
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
52-
53-
private:
54-
AggregationTemporality aggregation_temporality_;
5550
};
5651
} // namespace metrics
5752
} // namespace sdk

sdk/include/opentelemetry/sdk/metrics/metric_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class MetricReader
5757

5858
virtual void OnInitialized() noexcept {}
5959

60+
protected:
6061
bool IsShutdown() const noexcept;
6162

6263
private:

sdk/src/metrics/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ add_library(
44
meter.cc
55
meter_context.cc
66
metric_reader.cc
7+
export/periodic_exporting_metric_reader.cc
78
state/metric_collector.cc
89
state/sync_metric_storage.cc
910
aggregation/histogram_aggregation.cc
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#ifndef ENABLE_METRICS_PREVIEW
5+
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
6+
# include "opentelemetry/sdk/common/global_log_handler.h"
7+
# include "opentelemetry/sdk/metrics/metric_exporter.h"
8+
9+
# include <chrono>
10+
# include <future>
11+
12+
OPENTELEMETRY_BEGIN_NAMESPACE
13+
namespace sdk
14+
{
15+
namespace metrics
16+
{
17+
18+
PeriodicExportingMetricReader::PeriodicExportingMetricReader(
19+
std::unique_ptr<MetricExporter> exporter,
20+
const PeriodicExportingMetricReaderOptions &option,
21+
AggregationTemporality aggregation_temporality)
22+
: MetricReader(aggregation_temporality),
23+
exporter_{std::move(exporter)},
24+
export_interval_millis_{option.export_interval_millis},
25+
export_timeout_millis_{option.export_timeout_millis}
26+
{
27+
if (export_interval_millis_ <= export_timeout_millis_)
28+
{
29+
OTEL_INTERNAL_LOG_WARN(
30+
"[Periodic Exporting Metric Reader] Invalid configuration: "
31+
"export_interval_millis_ should be less than export_timeout_millis_, using default values");
32+
export_interval_millis_ = kExportIntervalMillis;
33+
export_timeout_millis_ = kExportTimeOutMillis;
34+
}
35+
}
36+
37+
void PeriodicExportingMetricReader::OnInitialized() noexcept
38+
{
39+
worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this);
40+
}
41+
42+
void PeriodicExportingMetricReader::DoBackgroundWork()
43+
{
44+
std::unique_lock<std::mutex> lk(cv_m_);
45+
do
46+
{
47+
if (IsShutdown())
48+
{
49+
break;
50+
}
51+
std::atomic<bool> cancel_export_for_timeout{false};
52+
auto start = std::chrono::steady_clock::now();
53+
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
54+
Collect([this, &cancel_export_for_timeout](MetricData data) {
55+
if (cancel_export_for_timeout)
56+
{
57+
OTEL_INTERNAL_LOG_ERROR(
58+
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
59+
<< export_timeout_millis_.count() << " ms, and timed out");
60+
return false;
61+
}
62+
this->exporter_->Export(data);
63+
return true;
64+
});
65+
});
66+
std::future_status status;
67+
do
68+
{
69+
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
70+
if (status == std::future_status::timeout)
71+
{
72+
cancel_export_for_timeout = true;
73+
break;
74+
}
75+
} while (status != std::future_status::ready);
76+
auto end = std::chrono::steady_clock::now();
77+
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
78+
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
79+
cv_.wait_for(lk, remaining_wait_interval_ms);
80+
} while (true);
81+
}
82+
83+
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
84+
{
85+
return exporter_->ForceFlush(timeout);
86+
}
87+
88+
bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
89+
{
90+
if (worker_thread_.joinable())
91+
{
92+
cv_.notify_one();
93+
worker_thread_.join();
94+
}
95+
return exporter_->Shutdown(timeout);
96+
}
97+
98+
} // namespace metrics
99+
} // namespace sdk
100+
OPENTELEMETRY_END_NAMESPACE
101+
#endif

sdk/src/metrics/metric_reader.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality)
2020
void MetricReader::SetMetricProducer(MetricProducer *metric_producer)
2121
{
2222
metric_producer_ = metric_producer;
23+
OnInitialized();
2324
}
2425

2526
AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept
@@ -46,18 +47,21 @@ bool MetricReader::Collect(nostd::function_ref<bool(MetricData)> callback) noexc
4647
bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
4748
{
4849
bool status = true;
49-
5050
if (IsShutdown())
5151
{
5252
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
5353
}
54+
55+
{
56+
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
57+
shutdown_ = true;
58+
}
59+
5460
if (!OnShutDown(timeout))
5561
{
5662
status = false;
5763
OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!");
5864
}
59-
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
60-
shutdown_ = true;
6165
return status;
6266
}
6367

sdk/test/metrics/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ foreach(
1111
observer_result_test
1212
sync_instruments_test
1313
async_instruments_test
14-
metric_reader_test)
14+
metric_reader_test
15+
periodic_exporting_metric_reader_test)
1516
add_executable(${testname} "${testname}.cc")
1617
target_link_libraries(
1718
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}

sdk/test/metrics/meter_provider_sdk_test.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter
1818

1919
public:
2020
MockMetricExporter() = default;
21-
opentelemetry::sdk::common::ExportResult Export(
22-
const opentelemetry::nostd::span<std::unique_ptr<MetricData>> &records) noexcept override
21+
opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override
2322
{
2423
return opentelemetry::sdk::common::ExportResult::kSuccess;
2524
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#ifndef ENABLE_METRICS_PREVIEW
5+
6+
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
7+
# include "opentelemetry/sdk/metrics/export/metric_producer.h"
8+
# include "opentelemetry/sdk/metrics/metric_exporter.h"
9+
10+
# include <gtest/gtest.h>
11+
12+
using namespace opentelemetry;
13+
using namespace opentelemetry::sdk::instrumentationlibrary;
14+
using namespace opentelemetry::sdk::metrics;
15+
16+
class MockPushMetricExporter : public MetricExporter
17+
{
18+
public:
19+
opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override
20+
{
21+
records_.push_back(record);
22+
return opentelemetry::sdk::common::ExportResult::kSuccess;
23+
}
24+
25+
bool ForceFlush(
26+
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
27+
{
28+
return false;
29+
}
30+
31+
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
32+
{
33+
return true;
34+
}
35+
36+
size_t GetDataCount() { return records_.size(); }
37+
38+
private:
39+
std::vector<MetricData> records_;
40+
};
41+
42+
class MockMetricProducer : public MetricProducer
43+
{
44+
public:
45+
MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero())
46+
: sleep_ms_{sleep_ms}, data_sent_size_(0)
47+
{}
48+
49+
bool Collect(nostd::function_ref<bool(MetricData)> callback) noexcept override
50+
{
51+
std::this_thread::sleep_for(sleep_ms_);
52+
data_sent_size_++;
53+
MetricData data;
54+
callback(data);
55+
return true;
56+
}
57+
58+
size_t GetDataCount() { return data_sent_size_; }
59+
60+
private:
61+
std::chrono::microseconds sleep_ms_;
62+
size_t data_sent_size_;
63+
};
64+
65+
TEST(PeriodicExporingMetricReader, BasicTests)
66+
{
67+
std::unique_ptr<MetricExporter> exporter(new MockPushMetricExporter());
68+
PeriodicExportingMetricReaderOptions options;
69+
options.export_timeout_millis = std::chrono::milliseconds(200);
70+
options.export_interval_millis = std::chrono::milliseconds(500);
71+
auto exporter_ptr = exporter.get();
72+
PeriodicExportingMetricReader reader(std::move(exporter), options);
73+
MockMetricProducer producer;
74+
reader.SetMetricProducer(&producer);
75+
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
76+
reader.Shutdown();
77+
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
78+
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
79+
}
80+
81+
#endif

0 commit comments

Comments
 (0)