@@ -23,11 +23,14 @@ import (
2323 "testing"
2424 "time"
2525
26+ "github.com/stretchr/testify/require"
27+ "go.uber.org/zap"
28+ "go.uber.org/zap/zaptest"
29+
2630 "go.etcd.io/etcd/api/v3/mvccpb"
2731 "go.etcd.io/etcd/pkg/v3/traceutil"
2832 "go.etcd.io/etcd/server/v3/lease"
2933 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
30- "go.uber.org/zap"
3134)
3235
3336func TestWatch (t * testing.T ) {
@@ -259,6 +262,62 @@ func TestWatchCompacted(t *testing.T) {
259262 }
260263}
261264
265+ func TestWatchNoEventLossOnCompact (t * testing.T ) {
266+ oldChanBufLen , oldMaxWatchersPerSync := chanBufLen , maxWatchersPerSync
267+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
268+ lg := zaptest .NewLogger (t )
269+ s := newWatchableStore (lg , b , & lease.FakeLessor {}, StoreConfig {})
270+
271+ defer func () {
272+ cleanup (s , b , tmpPath )
273+ chanBufLen , maxWatchersPerSync = oldChanBufLen , oldMaxWatchersPerSync
274+ }()
275+
276+ chanBufLen , maxWatchersPerSync = 1 , 4
277+ testKey , testValue := []byte ("foo" ), []byte ("bar" )
278+
279+ maxRev := 10
280+ compactRev := int64 (5 )
281+ for i := 0 ; i < maxRev ; i ++ {
282+ s .Put (testKey , testValue , lease .NoLease )
283+ }
284+ _ , err := s .Compact (traceutil .TODO (), compactRev )
285+ require .NoErrorf (t , err , "failed to compact kv (%v)" , err )
286+
287+ w := s .NewWatchStream ()
288+ defer w .Close ()
289+
290+ watchers := map [WatchID ]int64 {
291+ 0 : 1 ,
292+ 1 : 1 , // create unsyncd watchers with startRev < compactRev
293+ 2 : 6 , // create unsyncd watchers with compactRev < startRev < currentRev
294+ }
295+ for id , startRev := range watchers {
296+ _ , err := w .Watch (id , testKey , nil , startRev )
297+ require .NoError (t , err )
298+ }
299+ // fill up w.Chan() with 1 buf via 2 compacted watch response
300+ s .syncWatchers ()
301+
302+ for len (watchers ) > 0 {
303+ resp := <- w .Chan ()
304+ if resp .CompactRevision != 0 {
305+ require .Equal (t , resp .CompactRevision , compactRev )
306+ require .Contains (t , watchers , resp .WatchID )
307+ delete (watchers , resp .WatchID )
308+ continue
309+ }
310+ nextRev := watchers [resp .WatchID ]
311+ for _ , ev := range resp .Events {
312+ require .Equalf (t , nextRev , ev .Kv .ModRevision , "got event revision %d but want %d for watcher with watch ID %d" , ev .Kv .ModRevision , nextRev , resp .WatchID )
313+ nextRev ++
314+ }
315+ if nextRev == s .rev ()+ 1 {
316+ delete (watchers , resp .WatchID )
317+ }
318+ }
319+ }
320+
262321func TestWatchFutureRev (t * testing.T ) {
263322 b , tmpPath := betesting .NewDefaultTmpBackend (t )
264323 s := newWatchableStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {})
0 commit comments