Skip to content

Commit 5156481

Browse files
committed
fix
Signed-off-by: pipiland2612 <[email protected]>
1 parent 7a2e2a5 commit 5156481

File tree

4 files changed

+74
-109
lines changed

4 files changed

+74
-109
lines changed

storage/remote/queue_manager.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,8 @@ type WriteClient interface {
416416
// HeadReader defines an interface for reading series metadata from TSDB head.
417417
type HeadReader interface {
418418
// GetMetadataByRef returns the metadata for the series with the given reference.
419-
// Also returns the series labels hash for verification against ref reuse.
420-
// Returns nil metadata if the series is not found.
421-
GetMetadataByRef(ref chunks.HeadSeriesRef) (meta *metadata.Metadata, labelsHash uint64, err error)
419+
// Returns metadata with type MetricTypeUnknown if the series is not found.
420+
GetMetadataByRef(ref chunks.HeadSeriesRef) (meta metadata.Metadata)
422421
}
423422

424423
// QueueManager manages a queue of samples to be sent to the Storage
@@ -1972,10 +1971,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
19721971
var scrapeCacheMetadata *scrape.MetricMetadata
19731972

19741973
if d.metadata == nil && d.seriesRef != 0 && head != nil {
1975-
if meta, labelsHash, err := head.GetMetadataByRef(d.seriesRef); err == nil && meta != nil {
1976-
if labelsHash == d.seriesLabels.Hash() {
1977-
headMetadata = meta
1978-
}
1974+
meta := head.GetMetadataByRef(d.seriesRef)
1975+
// Only use metadata if it's not unknown type.
1976+
if meta.Type != model.MetricTypeUnknown {
1977+
headMetadata = &meta
19791978
}
19801979
}
19811980

storage/remote/queue_manager_test.go

Lines changed: 26 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
4747
"github.com/prometheus/prometheus/schema"
4848
"github.com/prometheus/prometheus/scrape"
49+
"github.com/prometheus/prometheus/tsdb"
4950
"github.com/prometheus/prometheus/tsdb/chunks"
5051
"github.com/prometheus/prometheus/tsdb/record"
5152
"github.com/prometheus/prometheus/util/compression"
@@ -1395,31 +1396,18 @@ func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) (WriteRes
13951396
func (c *MockWriteClient) Name() string { return c.NameFunc() }
13961397
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
13971398

1398-
// mockHeadReader is a mock implementation of HeadReader for testing.
1399-
type mockHeadReader struct {
1400-
metadata map[chunks.HeadSeriesRef]*metadata.Metadata
1401-
hashes map[chunks.HeadSeriesRef]uint64
1402-
}
1399+
func addSeriesWithMetadata(t *testing.T, h *tsdb.Head, lbls labels.Labels, meta metadata.Metadata) chunks.HeadSeriesRef {
1400+
app := h.Appender(context.Background())
14031401

1404-
func newMockHeadReader() *mockHeadReader {
1405-
return &mockHeadReader{
1406-
metadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
1407-
hashes: make(map[chunks.HeadSeriesRef]uint64),
1408-
}
1409-
}
1402+
ref, err := app.Append(0, lbls, 1000, 1.0)
1403+
require.NoError(t, err)
14101404

1411-
func (m *mockHeadReader) GetMetadataByRef(ref chunks.HeadSeriesRef) (*metadata.Metadata, uint64, error) {
1412-
meta, ok := m.metadata[ref]
1413-
if !ok {
1414-
return nil, 0, nil
1415-
}
1416-
hash := m.hashes[ref]
1417-
return meta, hash, nil
1418-
}
1405+
_, err = app.UpdateMetadata(0, lbls, meta)
1406+
require.NoError(t, err)
1407+
1408+
require.NoError(t, app.Commit())
14191409

1420-
func (m *mockHeadReader) addSeries(ref chunks.HeadSeriesRef, meta *metadata.Metadata, labelsHash uint64) {
1421-
m.metadata[ref] = meta
1422-
m.hashes[ref] = labelsHash
1410+
return chunks.HeadSeriesRef(ref)
14231411
}
14241412

14251413
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
@@ -3039,99 +3027,39 @@ func TestPopulateV2TimeSeries_ScrapeCacheHistogram(t *testing.T) {
30393027
}
30403028
}
30413029

3042-
// TestHeadReaderMetadataLookup tests basic HeadReader functionality.
3043-
func TestHeadReaderMetadataLookup(t *testing.T) {
3044-
head := newMockHeadReader()
3030+
// TestPopulateV2TimeSeriesWithHeadMetadata tests metadata lookup from TSDB head.
3031+
func TestPopulateV2TimeSeriesWithHeadMetadata(t *testing.T) {
3032+
dir := t.TempDir()
30453033

3046-
// Add test series with metadata.
3047-
ref := chunks.HeadSeriesRef(1)
3048-
meta := &metadata.Metadata{
3049-
Type: "counter",
3050-
Unit: "bytes",
3051-
Help: "Total bytes processed",
3052-
}
3053-
lbls := labels.FromStrings("__name__", "http_requests_total", "job", "api")
3054-
head.addSeries(ref, meta, lbls.Hash())
3034+
opts := tsdb.DefaultHeadOptions()
3035+
opts.ChunkRange = 1000
3036+
opts.ChunkDirRoot = dir
30553037

3056-
// Test successful lookup.
3057-
gotMeta, gotHash, err := head.GetMetadataByRef(ref)
3058-
require.NoError(t, err)
3059-
require.NotNil(t, gotMeta)
3060-
require.Equal(t, meta.Type, gotMeta.Type)
3061-
require.Equal(t, meta.Unit, gotMeta.Unit)
3062-
require.Equal(t, meta.Help, gotMeta.Help)
3063-
require.Equal(t, lbls.Hash(), gotHash)
3064-
3065-
// Test missing series.
3066-
missingMeta, missingHash, err := head.GetMetadataByRef(chunks.HeadSeriesRef(999))
3038+
head, err := tsdb.NewHead(nil, nil, nil, nil, opts, nil)
30673039
require.NoError(t, err)
3068-
require.Nil(t, missingMeta)
3069-
require.Equal(t, uint64(0), missingHash)
3070-
}
3071-
3072-
// TestHeadMetadataWithLabelHashMismatch tests ref reuse protection.
3073-
func TestHeadMetadataWithLabelHashMismatch(t *testing.T) {
3074-
head := newMockHeadReader()
3075-
3076-
ref := chunks.HeadSeriesRef(1)
3077-
originalLabels := labels.FromStrings("__name__", "metric1", "job", "old")
3078-
meta := &metadata.Metadata{
3079-
Type: "counter",
3080-
Help: "Original help",
3081-
}
3082-
head.addSeries(ref, meta, originalLabels.Hash())
3040+
require.NoError(t, head.Init(0))
30833041

3084-
newLabels := labels.FromStrings("__name__", "metric1", "job", "new")
3085-
3086-
batch := []timeSeries{
3087-
{
3088-
seriesLabels: newLabels, // Different labels
3089-
seriesRef: ref, // Same ref
3090-
metadata: nil,
3091-
timestamp: 1000,
3092-
value: 1.0,
3093-
sType: tSample,
3094-
},
3095-
}
3096-
3097-
symbolTable := writev2.NewSymbolTable()
3098-
pendingData := make([]writev2.TimeSeries, 1)
3099-
3100-
nSamples, _, _, nMetadata, _ := populateV2TimeSeries(
3101-
&symbolTable, batch, pendingData,
3102-
false, false, false,
3103-
head, nil,
3104-
)
3105-
3106-
require.Equal(t, 1, nSamples)
3107-
// Hash mismatch should prevent metadata lookup.
3108-
require.Equal(t, 0, nMetadata, "Metadata should NOT be used due to hash mismatch")
3109-
}
3110-
3111-
// TestPopulateV2TimeSeriesWithHeadMetadata tests metadata lookup from TSDB head.
3112-
func TestPopulateV2TimeSeriesWithHeadMetadata(t *testing.T) {
3113-
head := newMockHeadReader()
3114-
3115-
// Setup test data
3116-
ref1 := chunks.HeadSeriesRef(1)
3117-
ref2 := chunks.HeadSeriesRef(2)
3042+
defer func() {
3043+
require.NoError(t, head.Close())
3044+
}()
31183045

3046+
// Setup test data.
31193047
lbls1 := labels.FromStrings("__name__", "metric1", "job", "test")
31203048
lbls2 := labels.FromStrings("__name__", "metric2", "job", "test")
31213049

3122-
meta1 := &metadata.Metadata{
3050+
meta1 := metadata.Metadata{
31233051
Type: "counter",
31243052
Unit: "seconds",
31253053
Help: "Metric 1 help text",
31263054
}
3127-
meta2 := &metadata.Metadata{
3055+
meta2 := metadata.Metadata{
31283056
Type: "gauge",
31293057
Unit: "bytes",
31303058
Help: "Metric 2 help text",
31313059
}
31323060

3133-
head.addSeries(ref1, meta1, lbls1.Hash())
3134-
head.addSeries(ref2, meta2, lbls2.Hash())
3061+
ref1 := addSeriesWithMetadata(t, head, lbls1, meta1)
3062+
ref2 := addSeriesWithMetadata(t, head, lbls2, meta2)
31353063

31363064
batch := []timeSeries{
31373065
{

tsdb/head.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/oklog/ulid/v2"
3030
"github.com/prometheus/client_golang/prometheus"
31+
"github.com/prometheus/common/model"
3132
"github.com/prometheus/common/promslog"
3233
"go.uber.org/atomic"
3334

@@ -1654,13 +1655,18 @@ func (h *Head) NumStaleSeries() uint64 {
16541655
}
16551656

16561657
// GetMetadataByRef returns the metadata for the series with the given reference.
1657-
// Also returns the series labels hash for verification against ref reuse.
1658-
func (h *Head) GetMetadataByRef(ref chunks.HeadSeriesRef) (meta *metadata.Metadata, labelsHash uint64, err error) {
1658+
func (h *Head) GetMetadataByRef(ref chunks.HeadSeriesRef) (meta metadata.Metadata) {
16591659
series := h.series.getByID(ref)
16601660
if series == nil {
1661-
return nil, 0, nil
1661+
// Return metadata with unknown type for non-existent series.
1662+
return metadata.Metadata{Type: model.MetricTypeUnknown}
16621663
}
1663-
return series.meta, series.lset.Hash(), nil
1664+
series.Lock()
1665+
defer series.Unlock()
1666+
if series.meta == nil {
1667+
return metadata.Metadata{Type: model.MetricTypeUnknown}
1668+
}
1669+
return *series.meta
16641670
}
16651671

16661672
var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")

tsdb/head_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/prometheus/client_golang/prometheus"
3636
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
3737
dto "github.com/prometheus/client_model/go"
38+
"github.com/prometheus/common/model"
3839
"github.com/prometheus/common/promslog"
3940
"github.com/stretchr/testify/require"
4041
"go.uber.org/atomic"
@@ -44,6 +45,7 @@ import (
4445
"github.com/prometheus/prometheus/model/exemplar"
4546
"github.com/prometheus/prometheus/model/histogram"
4647
"github.com/prometheus/prometheus/model/labels"
48+
"github.com/prometheus/prometheus/model/metadata"
4749
"github.com/prometheus/prometheus/model/value"
4850
"github.com/prometheus/prometheus/storage"
4951
"github.com/prometheus/prometheus/tsdb/chunkenc"
@@ -7336,3 +7338,33 @@ func TestHistogramStalenessConversionMetrics(t *testing.T) {
73367338
})
73377339
}
73387340
}
7341+
func TestHead_GetMetadataByRef(t *testing.T) {
7342+
head, _ := newTestHead(t, 1000, compression.None, false)
7343+
defer func() {
7344+
require.NoError(t, head.Close())
7345+
}()
7346+
7347+
lbls := labels.FromStrings("__name__", "http_requests_total", "job", "api")
7348+
meta := metadata.Metadata{
7349+
Type: "counter",
7350+
Unit: "bytes",
7351+
Help: "Total bytes processed",
7352+
}
7353+
7354+
app := head.Appender(context.Background())
7355+
ref, err := app.Append(0, lbls, 1000, 1.0)
7356+
require.NoError(t, err)
7357+
_, err = app.UpdateMetadata(0, lbls, meta)
7358+
require.NoError(t, err)
7359+
require.NoError(t, app.Commit())
7360+
7361+
// Test successful lookup.
7362+
gotMeta := head.GetMetadataByRef(chunks.HeadSeriesRef(ref))
7363+
require.Equal(t, meta.Type, gotMeta.Type)
7364+
require.Equal(t, meta.Unit, gotMeta.Unit)
7365+
require.Equal(t, meta.Help, gotMeta.Help)
7366+
7367+
// Test missing series.
7368+
missingMeta := head.GetMetadataByRef(chunks.HeadSeriesRef(999999))
7369+
require.Equal(t, model.MetricTypeUnknown, missingMeta.Type)
7370+
}

0 commit comments

Comments
 (0)