@@ -58,29 +58,47 @@ type writeToMock struct {
5858 floatHistogramsAppended int
5959 seriesLock sync.Mutex
6060 seriesSegmentIndexes map [chunks.HeadSeriesRef ]int
61+
62+ // delay reads with a short sleep
63+ delay time.Duration
6164}
6265
6366func (wtm * writeToMock ) Append (s []record.RefSample ) bool {
67+ if wtm .delay > 0 {
68+ time .Sleep (wtm .delay )
69+ }
6470 wtm .samplesAppended += len (s )
6571 return true
6672}
6773
6874func (wtm * writeToMock ) AppendExemplars (e []record.RefExemplar ) bool {
75+ if wtm .delay > 0 {
76+ time .Sleep (wtm .delay )
77+ }
6978 wtm .exemplarsAppended += len (e )
7079 return true
7180}
7281
7382func (wtm * writeToMock ) AppendHistograms (h []record.RefHistogramSample ) bool {
83+ if wtm .delay > 0 {
84+ time .Sleep (wtm .delay )
85+ }
7486 wtm .histogramsAppended += len (h )
7587 return true
7688}
7789
7890func (wtm * writeToMock ) AppendFloatHistograms (fh []record.RefFloatHistogramSample ) bool {
91+ if wtm .delay > 0 {
92+ time .Sleep (wtm .delay )
93+ }
7994 wtm .floatHistogramsAppended += len (fh )
8095 return true
8196}
8297
8398func (wtm * writeToMock ) StoreSeries (series []record.RefSeries , index int ) {
99+ if wtm .delay > 0 {
100+ time .Sleep (wtm .delay )
101+ }
84102 wtm .UpdateSeriesSegment (series , index )
85103}
86104
@@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int {
110128 return len (wtm .seriesSegmentIndexes )
111129}
112130
113- func newWriteToMock () * writeToMock {
131+ func newWriteToMock (delay time. Duration ) * writeToMock {
114132 return & writeToMock {
115133 seriesSegmentIndexes : make (map [chunks.HeadSeriesRef ]int ),
134+ delay : delay ,
116135 }
117136}
118137
@@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) {
209228 first , last , err := Segments (w .Dir ())
210229 require .NoError (t , err )
211230
212- wt := newWriteToMock ()
231+ wt := newWriteToMock (0 )
213232 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , true , true )
214233 watcher .SetStartTime (now )
215234
@@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
294313 _ , _ , err = Segments (w .Dir ())
295314 require .NoError (t , err )
296315
297- wt := newWriteToMock ()
316+ wt := newWriteToMock (0 )
298317 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
299318 go watcher .Start ()
300319
@@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
383402 _ , _ , err = Segments (w .Dir ())
384403 require .NoError (t , err )
385404 readTimeout = time .Second
386- wt := newWriteToMock ()
405+ wt := newWriteToMock (0 )
387406 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
388407 go watcher .Start ()
389408
@@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) {
454473 _ , _ , err = Segments (w .Dir ())
455474 require .NoError (t , err )
456475
457- wt := newWriteToMock ()
476+ wt := newWriteToMock (0 )
458477 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
459478 go watcher .Start ()
460479
@@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
523542 require .NoError (t , err )
524543 }
525544
526- wt := newWriteToMock ()
545+ wt := newWriteToMock (0 )
527546 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
528547 watcher .MaxSegment = - 1
529548
@@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
596615 require .NoError (t , err )
597616
598617 readTimeout = time .Second
599- wt := newWriteToMock ()
618+ wt := newWriteToMock (0 )
600619 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
601620 watcher .MaxSegment = - 1
602621 go watcher .Start ()
@@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) {
675694 }
676695 require .NoError (t , w .Close ())
677696
678- wt := newWriteToMock ()
697+ wt := newWriteToMock (0 )
679698 watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
680699 watcher .MaxSegment = segments
681700
@@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) {
688707 })
689708 }
690709}
710+
711+ func TestRun_AvoidNotifyWhenBehind (t * testing.T ) {
712+ const pageSize = 32 * 1024
713+ const segments = 10
714+ const seriesCount = 20
715+ const samplesCount = 300
716+
717+ // This test can take longer than intended to finish in cloud CI.
718+ readTimeout := 10 * time .Second
719+
720+ for _ , compress := range []CompressionType {CompressionNone , CompressionSnappy , CompressionZstd } {
721+ t .Run (string (compress ), func (t * testing.T ) {
722+ dir := t .TempDir ()
723+
724+ wdir := path .Join (dir , "wal" )
725+ err := os .Mkdir (wdir , 0o777 )
726+ require .NoError (t , err )
727+
728+ enc := record.Encoder {}
729+ w , err := NewSize (nil , nil , wdir , pageSize , compress )
730+ require .NoError (t , err )
731+ var wg sync.WaitGroup
732+ // add one segment initially to ensure there's a value > 0 for the last segment id
733+ for i := 0 ; i < 1 ; i ++ {
734+ for j := 0 ; j < seriesCount ; j ++ {
735+ ref := j + (i * 100 )
736+ series := enc .Series ([]record.RefSeries {
737+ {
738+ Ref : chunks .HeadSeriesRef (ref ),
739+ Labels : labels .FromStrings ("__name__" , fmt .Sprintf ("metric_%d" , i )),
740+ },
741+ }, nil )
742+ require .NoError (t , w .Log (series ))
743+
744+ for k := 0 ; k < samplesCount ; k ++ {
745+ inner := rand .Intn (ref + 1 )
746+ sample := enc .Samples ([]record.RefSample {
747+ {
748+ Ref : chunks .HeadSeriesRef (inner ),
749+ T : int64 (i ),
750+ V : float64 (i ),
751+ },
752+ }, nil )
753+ require .NoError (t , w .Log (sample ))
754+ }
755+ }
756+ }
757+ wg .Add (1 )
758+ go func () {
759+ defer wg .Done ()
760+ for i := 1 ; i < segments ; i ++ {
761+ for j := 0 ; j < seriesCount ; j ++ {
762+ ref := j + (i * 100 )
763+ series := enc .Series ([]record.RefSeries {
764+ {
765+ Ref : chunks .HeadSeriesRef (ref ),
766+ Labels : labels .FromStrings ("__name__" , fmt .Sprintf ("metric_%d" , i )),
767+ },
768+ }, nil )
769+ require .NoError (t , w .Log (series ))
770+
771+ for k := 0 ; k < samplesCount ; k ++ {
772+ inner := rand .Intn (ref + 1 )
773+ sample := enc .Samples ([]record.RefSample {
774+ {
775+ Ref : chunks .HeadSeriesRef (inner ),
776+ T : int64 (i ),
777+ V : float64 (i ),
778+ },
779+ }, nil )
780+ require .NoError (t , w .Log (sample ))
781+ }
782+ }
783+ }
784+ }()
785+
786+ wt := newWriteToMock (time .Millisecond )
787+ watcher := NewWatcher (wMetrics , nil , nil , "" , wt , dir , false , false )
788+ watcher .MaxSegment = segments
789+
790+ watcher .setMetrics ()
791+ startTime := time .Now ()
792+ err = watcher .Run ()
793+ wg .Wait ()
794+ require .Less (t , time .Since (startTime ), readTimeout )
795+ require .NoError (t , err )
796+ require .NoError (t , w .Close ())
797+ })
798+ }
799+ }
0 commit comments