Skip to content

Commit 54abc27

Browse files
authored
reuse temporal metric storage for sync storage (#1369)
1 parent 02630e0 commit 54abc27

File tree

2 files changed

+7
-113
lines changed

2 files changed

+7
-113
lines changed

sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@
55
#ifndef ENABLE_METRICS_PREVIEW
66
# include "opentelemetry/common/key_value_iterable_view.h"
77
# include "opentelemetry/sdk/common/attributemap_hash.h"
8-
# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
98
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
109
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
1110
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
1211
# include "opentelemetry/sdk/metrics/state/metric_collector.h"
1312
# include "opentelemetry/sdk/metrics/state/metric_storage.h"
1413

14+
# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h"
1515
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
16-
# include "opentelemetry/sdk/metrics/view/view.h"
17-
# include "opentelemetry/sdk/resource/resource.h"
1816

1917
# include <list>
2018
# include <memory>
@@ -24,13 +22,6 @@ namespace sdk
2422
{
2523
namespace metrics
2624
{
27-
28-
struct LastReportedMetrics
29-
{
30-
std::unique_ptr<AttributesHashMap> attributes_map;
31-
opentelemetry::common::SystemTimestamp collection_ts;
32-
};
33-
3425
class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
3526
{
3627

@@ -43,7 +34,9 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
4334
aggregation_type_{aggregation_type},
4435
attributes_hashmap_(new AttributesHashMap()),
4536
attributes_processor_{attributes_processor},
46-
exemplar_reservoir_(exemplar_reservoir)
37+
exemplar_reservoir_(exemplar_reservoir),
38+
temporal_metric_storage_(instrument_descriptor)
39+
4740
{
4841
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
4942
return std::move(
@@ -114,14 +107,10 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
114107

115108
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
116109
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
117-
// unreported metrics stash for all the collectors
118-
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
119-
unreported_metrics_;
120-
// last reported metrics stash for all the collectors.
121-
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
122110
const AttributesProcessor *attributes_processor_;
123111
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
124112
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
113+
TemporalMetricStorage temporal_metric_storage_;
125114
};
126115

127116
} // namespace metrics

sdk/src/metrics/state/sync_metric_storage.cc

Lines changed: 2 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -25,104 +25,9 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
2525
// recordings
2626
std::shared_ptr<AttributesHashMap> delta_metrics = std::move(attributes_hashmap_);
2727
attributes_hashmap_.reset(new AttributesHashMap);
28-
for (auto &col : collectors)
29-
{
30-
unreported_metrics_[col.get()].push_back(delta_metrics);
31-
}
3228

33-
// Get the unreported metrics for the `collector` from `unreported metrics stash`
34-
// since last collection, this will also cleanup the unreported metrics for `collector`
35-
// from the stash.
36-
auto present = unreported_metrics_.find(collector);
37-
if (present == unreported_metrics_.end())
38-
{
39-
// no unreported metrics for the collector, return.
40-
return true;
41-
}
42-
auto unreported_list = std::move(present->second);
43-
44-
// Iterate over the unreporter metrics for `collector` and store result in `merged_metrics`
45-
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap);
46-
for (auto &agg_hashmap : unreported_list)
47-
{
48-
agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
49-
Aggregation &aggregation) {
50-
auto agg = merged_metrics->Get(attributes);
51-
if (agg)
52-
{
53-
merged_metrics->Set(attributes, std::move(agg->Merge(aggregation)));
54-
}
55-
else
56-
{
57-
merged_metrics->Set(
58-
attributes,
59-
std::move(
60-
DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation)));
61-
merged_metrics->GetAllEnteries(
62-
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
63-
}
64-
return true;
65-
});
66-
}
67-
// Get the last reported metrics for the `collector` from `last reported metrics` stash
68-
// - If the aggregation_temporarily for the collector is cumulative
69-
// - Merge the last reported metrics with unreported metrics (which is in merged_metrics),
70-
// Final result of merge would be in merged_metrics.
71-
// - Move the final merge to the `last reported metrics` stash.
72-
// - If the aggregation_temporarily is delta
73-
// - Store the unreported metrics for `collector` (which is in merged_mtrics) to
74-
// `last reported metrics` stash.
75-
76-
auto reported = last_reported_metrics_.find(collector);
77-
if (reported != last_reported_metrics_.end())
78-
{
79-
last_collection_ts = last_reported_metrics_[collector].collection_ts;
80-
auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map);
81-
if (aggregation_temporarily == AggregationTemporality::kCumulative)
82-
{
83-
// merge current delta to previous cumulative
84-
last_aggr_hashmap->GetAllEnteries(
85-
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
86-
auto agg = merged_metrics->Get(attributes);
87-
if (agg)
88-
{
89-
merged_metrics->Set(attributes, agg->Merge(aggregation));
90-
}
91-
else
92-
{
93-
merged_metrics->Set(attributes,
94-
DefaultAggregation::CreateAggregation(instrument_descriptor_));
95-
}
96-
return true;
97-
});
98-
}
99-
last_reported_metrics_[collector] =
100-
LastReportedMetrics{std::move(merged_metrics), collection_ts};
101-
}
102-
else
103-
{
104-
merged_metrics->GetAllEnteries(
105-
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
106-
last_reported_metrics_.insert(
107-
std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts}));
108-
}
109-
110-
// Generate the MetricData from the final merged_metrics, and invoke callback over it.
111-
112-
AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get();
113-
MetricData metric_data;
114-
metric_data.instrument_descriptor = instrument_descriptor_;
115-
metric_data.start_ts = last_collection_ts;
116-
metric_data.end_ts = collection_ts;
117-
result_to_export->GetAllEnteries(
118-
[&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) {
119-
PointDataAttributes point_data_attr;
120-
point_data_attr.point_data = aggregation.ToPoint();
121-
point_data_attr.attributes = attributes;
122-
metric_data.point_data_attr_.push_back(point_data_attr);
123-
return true;
124-
});
125-
return callback(metric_data);
29+
return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
30+
std::move(delta_metrics), callback);
12631
}
12732

12833
} // namespace metrics

0 commit comments

Comments
 (0)