Skip to content

Commit 0c71230

Browse files
authored
fix bug that would cause us to endlessly fall behind (prometheus#13583)
* fix bug that would cause us to only read from the WAL on the 15s fallback timer if remote write had fallen behind and is no longer reading from the WAL segment that is currently being written to Signed-off-by: Callum Styan <[email protected]> * remove unintended logging, fix lint, plus allow test to take slightly longer because cloud CI Signed-off-by: Callum Styan <[email protected]> * address review feedback Signed-off-by: Callum Styan <[email protected]> * fix watcher sleeps in test, flu brain is smooth Signed-off-by: Callum Styan <[email protected]> * increase timeout, unfortunately cloud CI can require a longer timeout Signed-off-by: Callum Styan <[email protected]> --------- Signed-off-by: Callum Styan <[email protected]>
1 parent aba0071 commit 0c71230

File tree

2 files changed

+128
-13
lines changed

2 files changed

+128
-13
lines changed

tsdb/wlog/watcher.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,8 @@ func (w *Watcher) loop() {
262262
// Run the watcher, which will tail the WAL until the quit channel is closed
263263
// or an error case is hit.
264264
func (w *Watcher) Run() error {
265-
_, lastSegment, err := w.firstAndLast()
266-
if err != nil {
267-
return fmt.Errorf("wal.Segments: %w", err)
268-
}
265+
var lastSegment int
266+
var err error
269267

270268
// We want to ensure this is false across iterations since
271269
// Run will be called again if there was a failure to read the WAL.
@@ -296,9 +294,17 @@ func (w *Watcher) Run() error {
296294
w.currentSegmentMetric.Set(float64(currentSegment))
297295
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
298296

297+
// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for
298+
// between reads if we're reading a segment that is not the most recent segment after startup.
299+
_, lastSegment, err = w.firstAndLast()
300+
if err != nil {
301+
return fmt.Errorf("wal.Segments: %w", err)
302+
}
303+
tail := currentSegment >= lastSegment
304+
299305
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
300306
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
301-
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
307+
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
302308
return err
303309
}
304310

tsdb/wlog/watcher_test.go

Lines changed: 117 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,29 +58,47 @@ type writeToMock struct {
5858
floatHistogramsAppended int
5959
seriesLock sync.Mutex
6060
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
61+
62+
// delay reads with a short sleep
63+
delay time.Duration
6164
}
6265

6366
func (wtm *writeToMock) Append(s []record.RefSample) bool {
67+
if wtm.delay > 0 {
68+
time.Sleep(wtm.delay)
69+
}
6470
wtm.samplesAppended += len(s)
6571
return true
6672
}
6773

6874
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
75+
if wtm.delay > 0 {
76+
time.Sleep(wtm.delay)
77+
}
6978
wtm.exemplarsAppended += len(e)
7079
return true
7180
}
7281

7382
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
83+
if wtm.delay > 0 {
84+
time.Sleep(wtm.delay)
85+
}
7486
wtm.histogramsAppended += len(h)
7587
return true
7688
}
7789

7890
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
91+
if wtm.delay > 0 {
92+
time.Sleep(wtm.delay)
93+
}
7994
wtm.floatHistogramsAppended += len(fh)
8095
return true
8196
}
8297

8398
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
99+
if wtm.delay > 0 {
100+
time.Sleep(wtm.delay)
101+
}
84102
wtm.UpdateSeriesSegment(series, index)
85103
}
86104

@@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int {
110128
return len(wtm.seriesSegmentIndexes)
111129
}
112130

113-
func newWriteToMock() *writeToMock {
131+
func newWriteToMock(delay time.Duration) *writeToMock {
114132
return &writeToMock{
115133
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
134+
delay: delay,
116135
}
117136
}
118137

@@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) {
209228
first, last, err := Segments(w.Dir())
210229
require.NoError(t, err)
211230

212-
wt := newWriteToMock()
231+
wt := newWriteToMock(0)
213232
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
214233
watcher.SetStartTime(now)
215234

@@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
294313
_, _, err = Segments(w.Dir())
295314
require.NoError(t, err)
296315

297-
wt := newWriteToMock()
316+
wt := newWriteToMock(0)
298317
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
299318
go watcher.Start()
300319

@@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
383402
_, _, err = Segments(w.Dir())
384403
require.NoError(t, err)
385404
readTimeout = time.Second
386-
wt := newWriteToMock()
405+
wt := newWriteToMock(0)
387406
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
388407
go watcher.Start()
389408

@@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) {
454473
_, _, err = Segments(w.Dir())
455474
require.NoError(t, err)
456475

457-
wt := newWriteToMock()
476+
wt := newWriteToMock(0)
458477
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
459478
go watcher.Start()
460479

@@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
523542
require.NoError(t, err)
524543
}
525544

526-
wt := newWriteToMock()
545+
wt := newWriteToMock(0)
527546
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
528547
watcher.MaxSegment = -1
529548

@@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
596615
require.NoError(t, err)
597616

598617
readTimeout = time.Second
599-
wt := newWriteToMock()
618+
wt := newWriteToMock(0)
600619
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
601620
watcher.MaxSegment = -1
602621
go watcher.Start()
@@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) {
675694
}
676695
require.NoError(t, w.Close())
677696

678-
wt := newWriteToMock()
697+
wt := newWriteToMock(0)
679698
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
680699
watcher.MaxSegment = segments
681700

@@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) {
688707
})
689708
}
690709
}
710+
711+
func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
712+
const pageSize = 32 * 1024
713+
const segments = 10
714+
const seriesCount = 20
715+
const samplesCount = 300
716+
717+
// This test can take longer than intended to finish in cloud CI.
718+
readTimeout := 10 * time.Second
719+
720+
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
721+
t.Run(string(compress), func(t *testing.T) {
722+
dir := t.TempDir()
723+
724+
wdir := path.Join(dir, "wal")
725+
err := os.Mkdir(wdir, 0o777)
726+
require.NoError(t, err)
727+
728+
enc := record.Encoder{}
729+
w, err := NewSize(nil, nil, wdir, pageSize, compress)
730+
require.NoError(t, err)
731+
var wg sync.WaitGroup
732+
// add one segment initially to ensure there's a value > 0 for the last segment id
733+
for i := 0; i < 1; i++ {
734+
for j := 0; j < seriesCount; j++ {
735+
ref := j + (i * 100)
736+
series := enc.Series([]record.RefSeries{
737+
{
738+
Ref: chunks.HeadSeriesRef(ref),
739+
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
740+
},
741+
}, nil)
742+
require.NoError(t, w.Log(series))
743+
744+
for k := 0; k < samplesCount; k++ {
745+
inner := rand.Intn(ref + 1)
746+
sample := enc.Samples([]record.RefSample{
747+
{
748+
Ref: chunks.HeadSeriesRef(inner),
749+
T: int64(i),
750+
V: float64(i),
751+
},
752+
}, nil)
753+
require.NoError(t, w.Log(sample))
754+
}
755+
}
756+
}
757+
wg.Add(1)
758+
go func() {
759+
defer wg.Done()
760+
for i := 1; i < segments; i++ {
761+
for j := 0; j < seriesCount; j++ {
762+
ref := j + (i * 100)
763+
series := enc.Series([]record.RefSeries{
764+
{
765+
Ref: chunks.HeadSeriesRef(ref),
766+
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
767+
},
768+
}, nil)
769+
require.NoError(t, w.Log(series))
770+
771+
for k := 0; k < samplesCount; k++ {
772+
inner := rand.Intn(ref + 1)
773+
sample := enc.Samples([]record.RefSample{
774+
{
775+
Ref: chunks.HeadSeriesRef(inner),
776+
T: int64(i),
777+
V: float64(i),
778+
},
779+
}, nil)
780+
require.NoError(t, w.Log(sample))
781+
}
782+
}
783+
}
784+
}()
785+
786+
wt := newWriteToMock(time.Millisecond)
787+
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
788+
watcher.MaxSegment = segments
789+
790+
watcher.setMetrics()
791+
startTime := time.Now()
792+
err = watcher.Run()
793+
wg.Wait()
794+
require.Less(t, time.Since(startTime), readTimeout)
795+
require.NoError(t, err)
796+
require.NoError(t, w.Close())
797+
})
798+
}
799+
}

0 commit comments

Comments
 (0)