Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions tsdb/wlog/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit.
func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
var lastSegment int
var err error

// We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL.
Expand Down Expand Up @@ -296,9 +294,17 @@ func (w *Watcher) Run() error {
w.currentSegmentMetric.Set(float64(currentSegment))
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)

// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for
// between reads if we're reading a segment that is not the most recent segment after startup.
_, lastSegment, err = w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this was intentional (I cannot see it explicitly stated)

But I think that in order to help the watcher catch up, we started making it skip samples records now (tail=false here passed to readSegment makes it skip/ignore the samples records)

When this issue was discussed, the case <-segmentTicker.C block

case <-segmentTicker.C:
_, last, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("segments: %w", err)
}
// Check if new segments exists.
if last <= segmentNum {
continue
}
err = w.readSegment(reader, segmentNum, tail)
was ignored, even though it seems to be there for this same reason: "read the current segment until the end if a new one is there" and it checks that every segmentCheckPeriod = 100 * time.Millisecond which should be sufficient.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check that shows the samples being dropped here #14434

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose that the reason for the bug is that tail means several things in the code.
I will try a PR where the meanings are broken out into separate parameters.

Copy link
Copy Markdown
Member

@machine424 machine424 Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep,I think a seriesRecordsOnly as a param to readSegment makes more sense, or make readSegment reads everything and have a readSegmentToBuildSeries or sth to only build series.

I have a local branch with some cleaning regarding that, I was planning to push with the fix into #14434, but, please, be my guest.

Regarding the bug fix, I suggest we only revert #13583 and re-open #13471 to learn more about the real issue.

Strangely TestRun_AvoidNotifyWhenBehind still passes after the revert, locally.

I think running that operation every segmentCheckPeriod = 100 * time.Millisecond is acceptable.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree that just the new segment check via the ticker, without some additional changes in other places, is enough. This line would have had to change to be something like:

_, lastSegment, err := w.firstAndLast()
...
currentSegment = lastSegment

But then we introduce yet another way to lose samples.

I agree with @bboreham that the overloading of the meaning of tail is the problem. We still want to check for segments on some interval, read from the currently written to segment only when we're notified that it's been written to (or on some timer just in case write events are lost within RW somewhere), be notified of checkpoints, etc.

I think we could skip the select statement within watch completely if watch is not currently reading the segment that is being written to, we really want to just read and send the samples from currentSegment as fast as possible in that case (based on the available resources and queue config).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t have any stats/data to back my statements, I was just suggesting that unless we have proof (I don't think #13471 provided any) that setting segmentCheckPeriod = 100 * time.Millisecond could be problematic (reduces read throughput) in certain setups, reverting to the old behavior is the safest/simplest solution.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry I didn't follow this conversation and pushed my own ideas at #14439

I think it would be ok as a backport for v2.52 to revert and fix just the bit that is wrong.


// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
return err
}

Expand Down
125 changes: 117 additions & 8 deletions tsdb/wlog/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,47 @@ type writeToMock struct {
floatHistogramsAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int

// delay reads with a short sleep
delay time.Duration
}

func (wtm *writeToMock) Append(s []record.RefSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.samplesAppended += len(s)
return true
}

func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.exemplarsAppended += len(e)
return true
}

func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.histogramsAppended += len(h)
return true
}

func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.floatHistogramsAppended += len(fh)
return true
}

func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
if wtm.delay > 0 {
time.Sleep(wtm.delay)
}
wtm.UpdateSeriesSegment(series, index)
}

Expand Down Expand Up @@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int {
return len(wtm.seriesSegmentIndexes)
}

func newWriteToMock() *writeToMock {
func newWriteToMock(delay time.Duration) *writeToMock {
return &writeToMock{
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
delay: delay,
}
}

Expand Down Expand Up @@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) {
first, last, err := Segments(w.Dir())
require.NoError(t, err)

wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true)
watcher.SetStartTime(now)

Expand Down Expand Up @@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)

wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()

Expand Down Expand Up @@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()

Expand Down Expand Up @@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)

wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
go watcher.Start()

Expand Down Expand Up @@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
require.NoError(t, err)
}

wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1

Expand Down Expand Up @@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
require.NoError(t, err)

readTimeout = time.Second
wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = -1
go watcher.Start()
Expand Down Expand Up @@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) {
}
require.NoError(t, w.Close())

wt := newWriteToMock()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments

Expand All @@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) {
})
}
}

func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
const samplesCount = 300

// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second

for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()

wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)

enc := record.Encoder{}
w, err := NewSize(nil, nil, wdir, pageSize, compress)
require.NoError(t, err)
var wg sync.WaitGroup
// add one segment initially to ensure there's a value > 0 for the last segment id
for i := 0; i < 1; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))

for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < segments; i++ {
for j := 0; j < seriesCount; j++ {
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))

for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
}()

wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments

watcher.setMetrics()
startTime := time.Now()
err = watcher.Run()
wg.Wait()
require.Less(t, time.Since(startTime), readTimeout)
require.NoError(t, err)
require.NoError(t, w.Close())
})
}
}