@@ -482,3 +482,82 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto
482482 }
483483 }
484484}
485+
486+ // TestResumeCompactionOnTombstone verifies whether a deletion event is preserved
487+ // when etcd restarts and resumes compaction on a key that only has a tombstone revision.
488+ func TestResumeCompactionOnTombstone (t * testing.T ) {
489+ e2e .BeforeTest (t )
490+
491+ ctx := context .Background ()
492+ compactBatchLimit := 5
493+
494+ cfg := e2e.EtcdProcessClusterConfig {
495+ GoFailEnabled : true ,
496+ ClusterSize : 1 ,
497+ IsClientAutoTLS : true ,
498+ ClientTLS : e2e .ClientTLS ,
499+ CompactionBatchLimit : compactBatchLimit ,
500+ WatchProcessNotifyInterval : 100 * time .Millisecond ,
501+ }
502+ clus , err := e2e .NewEtcdProcessCluster (t , & cfg )
503+ require .NoError (t , err )
504+ defer clus .Close ()
505+
506+ c1 := newClient (t , clus .EndpointsGRPC (), cfg .ClientTLS , cfg .IsClientAutoTLS )
507+ defer c1 .Close ()
508+
509+ keyPrefix := "/key-"
510+ for i := 0 ; i < compactBatchLimit ; i ++ {
511+ key := fmt .Sprintf ("%s%d" , keyPrefix , i )
512+ value := fmt .Sprintf ("%d" , i )
513+
514+ t .Logf ("PUT key=%s, val=%s" , key , value )
515+ _ , err = c1 .KV .Put (ctx , key , value )
516+ require .NoError (t , err )
517+ }
518+
519+ firstKey := keyPrefix + "0"
520+ t .Logf ("DELETE key=%s" , firstKey )
521+ deleteResp , err := c1 .KV .Delete (ctx , firstKey )
522+ require .NoError (t , err )
523+
524+ var deleteEvent * clientv3.Event
525+ select {
526+ case watchResp := <- c1 .Watch (ctx , firstKey , clientv3 .WithRev (deleteResp .Header .Revision )):
527+ require .Len (t , watchResp .Events , 1 )
528+
529+ require .Equal (t , mvccpb .DELETE , watchResp .Events [0 ].Type )
530+ deletedKey := string (watchResp .Events [0 ].Kv .Key )
531+ require .Equal (t , firstKey , deletedKey )
532+
533+ deleteEvent = watchResp .Events [0 ]
534+ case <- time .After (100 * time .Millisecond ):
535+ t .Fatal ("timed out getting watch response" )
536+ }
537+
538+ require .NoError (t , clus .Procs [0 ].Failpoints ().SetupHTTP (ctx , "compactBeforeSetFinishedCompact" , `panic` ))
539+
540+ t .Logf ("COMPACT rev=%d" , deleteResp .Header .Revision )
541+ _ , err = c1 .KV .Compact (ctx , deleteResp .Header .Revision , clientv3 .WithCompactPhysical ())
542+ require .Error (t , err )
543+
544+ require .Error (t , clus .Procs [0 ].Stop ())
545+ // NOTE: The proc panics and exit code is 2. It's impossible to restart
546+ // that etcd proc because last exit code is 2 and Restart() refuses to
547+ // start new one. Using IsRunning() function is to cleanup status.
548+ require .False (t , clus .Procs [0 ].IsRunning ())
549+ require .NoError (t , clus .Restart ())
550+
551+ c2 := newClient (t , clus .EndpointsGRPC (), cfg .ClientTLS , cfg .IsClientAutoTLS )
552+ defer c2 .Close ()
553+
554+ watchChan := c2 .Watch (ctx , firstKey , clientv3 .WithRev (deleteResp .Header .Revision ))
555+ select {
556+ case watchResp := <- watchChan :
557+ require .Equal (t , []* clientv3.Event {deleteEvent }, watchResp .Events )
558+ case <- time .After (100 * time .Millisecond ):
559+ // we care only about the first response, but have an
560+ // escape hatch in case the watch response is delayed.
561+ t .Fatal ("timed out getting watch response" )
562+ }
563+ }
0 commit comments