-
Notifications
You must be signed in to change notification settings - Fork 10.3k
OTLP to directly write to an interface which can hide storage details #16951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
989a1d8
7b3de4e
ceb241b
17ccd49
320bb8d
1c029e4
5f42d67
b008f23
4b89cd5
6a7de71
f92da09
84f97d9
96199f9
a821f0d
7a82d77
358410f
8c46c60
73afc92
db3470d
7ac6832
e44f51c
10a4d15
aff16e2
85079f4
2de7ec6
bc978de
86a2f4b
5f8329d
4742e01
e9e3467
2e6f445
b331e9b
ec5cbf6
84e9295
c5b6c6d
1f7d957
3cea333
bd2f7f2
8da268d
eb3f402
6937e31
5a6b66f
75762dc
3b280e9
dddc085
8c7aafd
ed5d086
f015bba
f584ff2
d339809
ea4abd1
d2d82ed
fdd326b
d5745ab
cc38828
428d9f0
9d9ba48
3cdf56e
5525adc
1b67c2f
faae3ea
84c5f6b
4d35e92
53ff66d
b03fd1e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| // Copyright The Prometheus Authors | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| // TODO(krajorama): rename this package to otlpappender or similar, as it is | ||
| // not specific to Prometheus remote write anymore. | ||
| // Note otlptranslator is already used by prometheus/otlptranslator repo. | ||
| package prometheusremotewrite | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "log/slog" | ||
|
|
||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promauto" | ||
|
|
||
| "github.com/prometheus/prometheus/model/exemplar" | ||
| "github.com/prometheus/prometheus/model/histogram" | ||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/prometheus/prometheus/model/metadata" | ||
| "github.com/prometheus/prometheus/storage" | ||
| ) | ||
|
|
||
| // Metadata extends metadata.Metadata with the metric family name. | ||
| // OTLP calculates the metric family name for all metrics and uses | ||
| // it for generating summary, histogram series by adding the magic | ||
| // suffixes. The metric family name is passed down to the appender | ||
| // in case the storage needs it for metadata updates. | ||
| // Known user is Mimir that implements /api/v1/metadata and uses | ||
| // Remote-Write 1.0 for this. Might be removed later if no longer | ||
| // needed by any downstream project. | ||
| type Metadata struct { | ||
| metadata.Metadata | ||
| MetricFamilyName string | ||
| } | ||
|
|
||
| // CombinedAppender is similar to storage.Appender, but combines updates to | ||
| // metadata, created timestamps, exemplars and samples into a single call. | ||
| type CombinedAppender interface { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! This might be a nice experiment to have this be a new appender interface on day!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To kind of leave this solved for now with some voting I'm kind of happy with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, not a picky with the name. However, given we agree on the interface I propose we actually go for it, and slowly start migrating TSDB to it: #17104
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it seems that TSDB storage might need something slightly different, I'd suggest
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would double check if we can find some common interfaces for this cases. Added some ideas in #17104 (comment) Would Otherwise this interface is really only for Mimir purposes, am I right? Which is fine, but then let's add comment in some TODO to remove it once Mimir does not need it, or at least to ensure we don't break Mimir.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've moved the metric family name to a new
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will defer naming to follow up PR. |
||
| // AppendSample appends a sample and related exemplars, metadata, and | ||
| // created timestamp to the storage. | ||
| AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) error | ||
| // AppendHistogram appends a histogram and related exemplars, metadata, and | ||
| // created timestamp to the storage. | ||
| AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error | ||
| } | ||
|
|
||
| // CombinedAppenderMetrics is for the metrics observed by the | ||
| // combinedAppender implementation. | ||
| type CombinedAppenderMetrics struct { | ||
| samplesAppendedWithoutMetadata prometheus.Counter | ||
| outOfOrderExemplars prometheus.Counter | ||
| } | ||
|
|
||
| func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetrics { | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return CombinedAppenderMetrics{ | ||
| samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Namespace: "prometheus", | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Subsystem: "api", | ||
| Name: "otlp_appended_samples_without_metadata_total", | ||
| Help: "The total number of samples ingested from OTLP without corresponding metadata.", | ||
| }), | ||
| outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
| Namespace: "prometheus", | ||
| Subsystem: "api", | ||
| Name: "otlp_out_of_order_exemplars_total", | ||
| Help: "The total number of received OTLP exemplars which were rejected because they were out of order.", | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| // NewCombinedAppender creates a combined appender that sets start times and | ||
| // updates metadata for each series only once, and appends samples and | ||
| // exemplars for each call. | ||
| func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender { | ||
| return &combinedAppender{ | ||
| app: app, | ||
| logger: logger, | ||
| ingestCTZeroSample: ingestCTZeroSample, | ||
| refs: make(map[uint64]seriesRef), | ||
| samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata, | ||
| outOfOrderExemplars: metrics.outOfOrderExemplars, | ||
| } | ||
| } | ||
|
|
||
| type seriesRef struct { | ||
| ref storage.SeriesRef | ||
| ct int64 | ||
| ls labels.Labels | ||
| meta metadata.Metadata | ||
| } | ||
|
|
||
| type combinedAppender struct { | ||
| app storage.Appender | ||
| logger *slog.Logger | ||
| samplesAppendedWithoutMetadata prometheus.Counter | ||
| outOfOrderExemplars prometheus.Counter | ||
| ingestCTZeroSample bool | ||
| // Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs. | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // To detect hash collision it also stores the labels. | ||
| // There is no overflow/conflict list, the TSDB will handle that part. | ||
| refs map[uint64]seriesRef | ||
| } | ||
|
|
||
| func (b *combinedAppender) AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (err error) { | ||
| return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, v, nil, es) | ||
| } | ||
|
|
||
| func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { | ||
| if h == nil { | ||
| // Sanity check, we should never get here with a nil histogram. | ||
| b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String()) | ||
| return errors.New("internal error, attempted to append nil histogram") | ||
| } | ||
| return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, 0, h, es) | ||
| } | ||
|
|
||
| func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { | ||
| hash := ls.Hash() | ||
| series, exists := b.refs[hash] | ||
| ref := series.ref | ||
| if exists && !labels.Equal(series.ls, ls) { | ||
| // Hash collision. The series reference we stored is pointing to a | ||
| // different series so we cannot use it, we need to reset the | ||
| // reference and cache. | ||
| // Note: we don't need to keep track of conflicts here, | ||
| // the TSDB will handle that part when we pass 0 reference. | ||
| exists = false | ||
| ref = 0 | ||
| } | ||
| updateRefs := !exists || series.ct != ct | ||
| if updateRefs && ct != 0 && b.ingestCTZeroSample { | ||
| var newRef storage.SeriesRef | ||
| if h != nil { | ||
| newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) | ||
| } else { | ||
| newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct) | ||
| } | ||
| if err != nil { | ||
| if !errors.Is(err, storage.ErrOutOfOrderCT) { | ||
| // Even for the first sample OOO is a common scenario because | ||
| // we can't tell if a CT was already ingested in a previous request. | ||
| // We ignore the error. | ||
| b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h)) | ||
| } | ||
| } else { | ||
| // We only use the returned reference on success as otherwise an | ||
| // error of CT append could invalidate the series reference. | ||
| ref = newRef | ||
| } | ||
| } | ||
| { | ||
| var newRef storage.SeriesRef | ||
| if h != nil { | ||
| newRef, err = b.app.AppendHistogram(ref, ls, t, h, nil) | ||
| } else { | ||
| newRef, err = b.app.Append(ref, ls, t, v) | ||
| } | ||
| if err != nil { | ||
| // Although Append does not currently return ErrDuplicateSampleForTimestamp there is | ||
| // a note indicating its inclusion in the future. | ||
| if errors.Is(err, storage.ErrOutOfOrderSample) || | ||
| errors.Is(err, storage.ErrOutOfBounds) || | ||
| errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { | ||
| b.logger.Error("Error when appending sample from OTLP", "err", err.Error(), "series", ls.String(), "timestamp", t, "sample_type", sampleType(h)) | ||
| } | ||
| } else { | ||
| // If the append was successful, we can use the returned reference. | ||
| ref = newRef | ||
| } | ||
| } | ||
|
|
||
| if ref == 0 { | ||
| // We cannot update metadata or add exemplars on non existent series. | ||
| return | ||
| } | ||
|
|
||
| if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit { | ||
| updateRefs = true | ||
| // If this is the first time we see this series, set the metadata. | ||
| _, err := b.app.UpdateMetadata(ref, ls, meta) | ||
| if err != nil { | ||
| b.samplesAppendedWithoutMetadata.Add(1) | ||
| b.logger.Warn("Error while updating metadata from OTLP", "err", err) | ||
| } | ||
| } | ||
|
|
||
| if updateRefs { | ||
| b.refs[hash] = seriesRef{ | ||
| ref: ref, | ||
| ct: ct, | ||
| ls: ls, | ||
| meta: meta, | ||
| } | ||
| } | ||
|
|
||
| b.appendExemplars(ref, ls, es) | ||
|
|
||
| return | ||
| } | ||
|
|
||
| func sampleType(h *histogram.Histogram) string { | ||
| if h == nil { | ||
| return "float" | ||
| } | ||
| return "histogram" | ||
| } | ||
|
|
||
| func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef { | ||
| var err error | ||
| for _, e := range es { | ||
| if ref, err = b.app.AppendExemplar(ref, ls, e); err != nil { | ||
| switch { | ||
| case errors.Is(err, storage.ErrOutOfOrderExemplar): | ||
| b.outOfOrderExemplars.Add(1) | ||
| b.logger.Debug("Out of order exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) | ||
aknuds1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| default: | ||
| // Since exemplar storage is still experimental, we don't fail the request on ingestion errors | ||
| b.logger.Debug("Error while adding exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) | ||
| } | ||
| } | ||
| } | ||
| return ref | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.