Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
989a1d8
OTLP writer writes directly to appender
dashpole Jul 29, 2025
7b3de4e
add metric family name to CombinedAppender
dashpole Jul 29, 2025
ceb241b
fix lint issue
krajorama Jul 30, 2025
17ccd49
Enhance and rename unit test TestOTLPWriteHandler
krajorama Jul 30, 2025
320bb8d
Add unit test for ingestCTZeroSample
krajorama Jul 30, 2025
1c029e4
Use slice labels explicitly in OTLP conversion
krajorama Jul 30, 2025
5f42d67
Move interface definition on top
krajorama Jul 30, 2025
b008f23
More consistent error messages
krajorama Jul 30, 2025
4b89cd5
Add hash collision detection
krajorama Jul 30, 2025
6a7de71
Merge branch 'main' into otlp-to-appender
krajorama Jul 31, 2025
f92da09
check for quantile vaules in TestPrometheusConverter_AddSummaryDataPo…
krajorama Jul 31, 2025
84f97d9
Merge branch 'main' into otlp-to-appender
krajorama Aug 5, 2025
96199f9
remove stray TODO
krajorama Aug 5, 2025
a821f0d
fix linter, check error
krajorama Aug 5, 2025
7a82d77
Add docstring to labels_common.go Contains function
krajorama Aug 5, 2025
358410f
fix interworking with real TSDB
krajorama Aug 5, 2025
8c46c60
Merge branch 'main' into otlp-to-appender
krajorama Aug 6, 2025
73afc92
dry: reuse NewFromSorted function
krajorama Aug 6, 2025
db3470d
Fix docstring mistakes
krajorama Aug 6, 2025
7ac6832
refactor for DRY
krajorama Aug 6, 2025
e44f51c
fix double registering metrics
krajorama Aug 6, 2025
10a4d15
refactor metrics for combinedAppender
krajorama Aug 6, 2025
aff16e2
use Warn level for metadata and CT problems
krajorama Aug 6, 2025
85079f4
simplify otel_scope_ conversion
krajorama Aug 6, 2025
2de7ec6
linter fix, remove unused field
krajorama Aug 6, 2025
bc978de
fix metadata handling and tests
krajorama Aug 6, 2025
86a2f4b
remove extra Commit interface and make tests more realistic
krajorama Aug 6, 2025
5f8329d
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Aug 6, 2025
4742e01
follow up merge from main
krajorama Aug 6, 2025
e9e3467
fix timeSeriesIsNew
krajorama Aug 6, 2025
2e6f445
revert style change
krajorama Aug 6, 2025
b331e9b
fix typo
krajorama Aug 6, 2025
ec5cbf6
do not ignore CT or metadata changes
krajorama Aug 7, 2025
84e9295
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Aug 7, 2025
c5b6c6d
more realistic benchmark, include real combinedAppender
krajorama Aug 7, 2025
1f7d957
BenchmarkPrometheusConverter_FromMetrics: have the same testcases as …
krajorama Aug 7, 2025
3cea333
Merge branch 'main' into otlp-to-appender
krajorama Aug 7, 2025
bd2f7f2
remove unused fields and allocations
krajorama Aug 7, 2025
8da268d
do not benchmark setup bits
krajorama Aug 7, 2025
eb3f402
Merge branch 'main' into otlp-to-appender
krajorama Aug 8, 2025
6937e31
Do not use custom labels implementation
krajorama Aug 7, 2025
5a6b66f
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Aug 11, 2025
75762dc
remove customization from model/labels
krajorama Aug 11, 2025
3b280e9
add todo for naming
krajorama Aug 11, 2025
dddc085
keep original order when setting scope attributes
krajorama Aug 11, 2025
8c7aafd
Update storage/remote/otlptranslator/prometheusremotewrite/combined_a…
krajorama Aug 15, 2025
ed5d086
Apply suggestions from code review
krajorama Aug 15, 2025
f015bba
undo style change
krajorama Aug 15, 2025
f584ff2
Update storage/remote/otlptranslator/prometheusremotewrite/helper.go
krajorama Aug 15, 2025
d339809
fixup suggested change
krajorama Aug 15, 2025
ea4abd1
Apply suggestions from code review
krajorama Aug 15, 2025
d2d82ed
readability improvement
krajorama Aug 15, 2025
fdd326b
revert to old order, err var name, use model definitions
krajorama Aug 15, 2025
d5745ab
Merge branch 'main' into otlp-to-appender
krajorama Aug 15, 2025
cc38828
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Sep 1, 2025
428d9f0
return an error on wrong use of combinedAppender.AppendHistogram
krajorama Sep 1, 2025
9d9ba48
better docstring on conflict and better variable name for updating refs
krajorama Sep 1, 2025
3cdf56e
only use reference from CT append if success
krajorama Sep 1, 2025
5525adc
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Sep 3, 2025
1b67c2f
reverse order of t and ct as created time is conceptually before time…
krajorama Sep 3, 2025
faae3ea
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Sep 5, 2025
84c5f6b
Update storage/remote/otlptranslator/prometheusremotewrite/combined_a…
krajorama Sep 7, 2025
4d35e92
fix metric help
krajorama Sep 7, 2025
53ff66d
Merge remote-tracking branch 'origin/main' into otlp-to-appender
krajorama Sep 7, 2025
b03fd1e
Move Metric Family name from extra argument to a new Metadata struct
krajorama Sep 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO(krajorama): rename this package to otlpappender or similar, as it is
// not specific to Prometheus remote write anymore.
// Note otlptranslator is already used by prometheus/otlptranslator repo.
package prometheusremotewrite

import (
"errors"
"fmt"
"log/slog"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)

// Metadata extends metadata.Metadata with the metric family name.
// OTLP calculates the metric family name for all metrics and uses
// it for generating summary, histogram series by adding the magic
// suffixes. The metric family name is passed down to the appender
// in case the storage needs it for metadata updates.
// Known user is Mimir that implements /api/v1/metadata and uses
// Remote-Write 1.0 for this. Might be removed later if no longer
// needed by any downstream project.
type Metadata struct {
metadata.Metadata
MetricFamilyName string
}

// CombinedAppender is similar to storage.Appender, but combines updates to
// metadata, created timestamps, exemplars and samples into a single call.
type CombinedAppender interface {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! This might be a nice experiment to have this be a new appender interface on day!

  1. nit: Have you considered struct opts for future flexibility and a good general pattern (too many args)? Structs should be on stack, so shouldn't matter efficiency wise.. 🤞🏽 Just thinking loudly, I don't mind tailored arguments for now.
  2. nit: Should we call it ExperimentalAppender or just Appender?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can be done later, let's defer.
  2. I think Appender is little bit overloaded. I kind of like CombinedAppender. But not against ExperimentalAppender either. How strongly do you feel about it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To kind of leave this solved for now with some voting I'm kind of happy with CombinedAppender too

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, not a picky with the name.

However, given we agree on the interface I propose we actually go for it, and slowly start migrating TSDB to it: #17104

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it seems that TSDB storage might need something slightly different, I'd suggest OTLPAppender so we can have the metric family name here while downstream projects need it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would double check if we can find some common interfaces for this cases.

Added some ideas in #17104 (comment)

Would context.Context help?

Otherwise this interface is really only for Mimir purposes, am I right? Which is fine, but then let's add comment in some TODO to remove it once Mimir does not need it, or at least to ensure we don't break Mimir.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the metric family name to a new Metadata struct as discussed in #17104.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will defer naming to follow up PR.

// AppendSample appends a sample and related exemplars, metadata, and
// created timestamp to the storage.
AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) error
// AppendHistogram appends a histogram and related exemplars, metadata, and
// created timestamp to the storage.
AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error
}

// CombinedAppenderMetrics is for the metrics observed by the
// combinedAppender implementation.
type CombinedAppenderMetrics struct {
samplesAppendedWithoutMetadata prometheus.Counter
outOfOrderExemplars prometheus.Counter
}

func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetrics {
return CombinedAppenderMetrics{
samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "otlp_appended_samples_without_metadata_total",
Help: "The total number of samples ingested from OTLP without corresponding metadata.",
}),
outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "otlp_out_of_order_exemplars_total",
Help: "The total number of received OTLP exemplars which were rejected because they were out of order.",
}),
}
}

// NewCombinedAppender creates a combined appender that sets start times and
// updates metadata for each series only once, and appends samples and
// exemplars for each call.
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender {
return &combinedAppender{
app: app,
logger: logger,
ingestCTZeroSample: ingestCTZeroSample,
refs: make(map[uint64]seriesRef),
samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata,
outOfOrderExemplars: metrics.outOfOrderExemplars,
}
}

type seriesRef struct {
ref storage.SeriesRef
ct int64
ls labels.Labels
meta metadata.Metadata
}

type combinedAppender struct {
app storage.Appender
logger *slog.Logger
samplesAppendedWithoutMetadata prometheus.Counter
outOfOrderExemplars prometheus.Counter
ingestCTZeroSample bool
// Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs.
// To detect hash collision it also stores the labels.
// There is no overflow/conflict list, the TSDB will handle that part.
refs map[uint64]seriesRef
}

func (b *combinedAppender) AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (err error) {
return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, v, nil, es)
}

func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) {
if h == nil {
// Sanity check, we should never get here with a nil histogram.
b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String())
return errors.New("internal error, attempted to append nil histogram")
}
return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, 0, h, es)
}

func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) {
hash := ls.Hash()
series, exists := b.refs[hash]
ref := series.ref
if exists && !labels.Equal(series.ls, ls) {
// Hash collision. The series reference we stored is pointing to a
// different series so we cannot use it, we need to reset the
// reference and cache.
// Note: we don't need to keep track of conflicts here,
// the TSDB will handle that part when we pass 0 reference.
exists = false
ref = 0
}
updateRefs := !exists || series.ct != ct
if updateRefs && ct != 0 && b.ingestCTZeroSample {
var newRef storage.SeriesRef
if h != nil {
newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
} else {
newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
}
if err != nil {
if !errors.Is(err, storage.ErrOutOfOrderCT) {
// Even for the first sample OOO is a common scenario because
// we can't tell if a CT was already ingested in a previous request.
// We ignore the error.
b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h))
}
} else {
// We only use the returned reference on success as otherwise an
// error of CT append could invalidate the series reference.
ref = newRef
}
}
{
var newRef storage.SeriesRef
if h != nil {
newRef, err = b.app.AppendHistogram(ref, ls, t, h, nil)
} else {
newRef, err = b.app.Append(ref, ls, t, v)
}
if err != nil {
// Although Append does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(err, storage.ErrOutOfOrderSample) ||
errors.Is(err, storage.ErrOutOfBounds) ||
errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
b.logger.Error("Error when appending sample from OTLP", "err", err.Error(), "series", ls.String(), "timestamp", t, "sample_type", sampleType(h))
}
} else {
// If the append was successful, we can use the returned reference.
ref = newRef
}
}

if ref == 0 {
// We cannot update metadata or add exemplars on non existent series.
return
}

if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit {
updateRefs = true
// If this is the first time we see this series, set the metadata.
_, err := b.app.UpdateMetadata(ref, ls, meta)
if err != nil {
b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Warn("Error while updating metadata from OTLP", "err", err)
}
}

if updateRefs {
b.refs[hash] = seriesRef{
ref: ref,
ct: ct,
ls: ls,
meta: meta,
}
}

b.appendExemplars(ref, ls, es)

return
}

func sampleType(h *histogram.Histogram) string {
if h == nil {
return "float"
}
return "histogram"
}

func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef {
var err error
for _, e := range es {
if ref, err = b.app.AppendExemplar(ref, ls, e); err != nil {
switch {
case errors.Is(err, storage.ErrOutOfOrderExemplar):
b.outOfOrderExemplars.Add(1)
b.logger.Debug("Out of order exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
default:
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors
b.logger.Debug("Error while adding exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err)
}
}
}
return ref
}
Loading
Loading