@@ -1030,7 +1030,7 @@ func TestTombstoneClean(t *testing.T) {
1030
1030
for _ , c := range cases {
1031
1031
// Delete the ranges.
1032
1032
1033
- // create snapshot
1033
+ // Create snapshot.
1034
1034
snap , err := ioutil .TempDir ("" , "snap" )
1035
1035
require .NoError (t , err )
1036
1036
@@ -1040,7 +1040,7 @@ func TestTombstoneClean(t *testing.T) {
1040
1040
require .NoError (t , db .Snapshot (snap , true ))
1041
1041
require .NoError (t , db .Close ())
1042
1042
1043
- // reopen DB from snapshot
1043
+ // Reopen DB from snapshot.
1044
1044
db , err = Open (snap , nil , nil , nil )
1045
1045
require .NoError (t , err )
1046
1046
defer db .Close ()
@@ -1099,6 +1099,54 @@ func TestTombstoneClean(t *testing.T) {
1099
1099
}
1100
1100
}
1101
1101
1102
+ // TestTombstoneCleanResultEmptyBlock tests that a TombstoneClean that results in empty blocks (no timeseries)
1103
+ // will also delete the resultant block.
1104
+ func TestTombstoneCleanResultEmptyBlock (t * testing.T ) {
1105
+ numSamples := int64 (10 )
1106
+
1107
+ db := openTestDB (t , nil , nil )
1108
+
1109
+ ctx := context .Background ()
1110
+ app := db .Appender (ctx )
1111
+
1112
+ smpls := make ([]float64 , numSamples )
1113
+ for i := int64 (0 ); i < numSamples ; i ++ {
1114
+ smpls [i ] = rand .Float64 ()
1115
+ app .Add (labels.Labels {{Name : "a" , Value : "b" }}, i , smpls [i ])
1116
+ }
1117
+
1118
+ require .NoError (t , app .Commit ())
1119
+ // Interval should cover the whole block.
1120
+ intervals := tombstones.Intervals {{Mint : 0 , Maxt : numSamples }}
1121
+
1122
+ // Create snapshot.
1123
+ snap , err := ioutil .TempDir ("" , "snap" )
1124
+ require .NoError (t , err )
1125
+
1126
+ defer func () {
1127
+ require .NoError (t , os .RemoveAll (snap ))
1128
+ }()
1129
+ require .NoError (t , db .Snapshot (snap , true ))
1130
+ require .NoError (t , db .Close ())
1131
+
1132
+ // Reopen DB from snapshot.
1133
+ db , err = Open (snap , nil , nil , nil )
1134
+ require .NoError (t , err )
1135
+ defer db .Close ()
1136
+
1137
+ // Create tombstones by deleting all samples.
1138
+ for _ , r := range intervals {
1139
+ require .NoError (t , db .Delete (r .Mint , r .Maxt , labels .MustNewMatcher (labels .MatchEqual , "a" , "b" )))
1140
+ }
1141
+
1142
+ require .NoError (t , db .CleanTombstones ())
1143
+
1144
+ // After cleaning tombstones that covers the entire block, no blocks should be left behind.
1145
+ actualBlockDirs , err := blockDirs (db .dir )
1146
+ require .NoError (t , err )
1147
+ require .Equal (t , 0 , len (actualBlockDirs ))
1148
+ }
1149
+
1102
1150
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
1103
1151
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
1104
1152
// if TombstoneClean leaves any blocks behind these will overlap.
@@ -1108,22 +1156,22 @@ func TestTombstoneCleanFail(t *testing.T) {
1108
1156
require .NoError (t , db .Close ())
1109
1157
}()
1110
1158
1111
- var expectedBlockDirs []string
1159
+ var oldBlockDirs []string
1112
1160
1113
- // Create some empty blocks pending for compaction.
1161
+ // Create some blocks pending for compaction.
1114
1162
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
1115
1163
totalBlocks := 2
1116
1164
for i := 0 ; i < totalBlocks ; i ++ {
1117
- blockDir := createBlock (t , db .Dir (), genSeries (1 , 1 , 0 , 1 ))
1165
+ blockDir := createBlock (t , db .Dir (), genSeries (1 , 1 , int64 ( i ), int64 ( i ) + 1 ))
1118
1166
block , err := OpenBlock (nil , blockDir , nil )
1119
1167
require .NoError (t , err )
1120
1168
// Add some fake tombstones to trigger the compaction.
1121
1169
tomb := tombstones .NewMemTombstones ()
1122
- tomb .AddInterval (0 , tombstones.Interval {Mint : 0 , Maxt : 1 })
1170
+ tomb .AddInterval (0 , tombstones.Interval {Mint : int64 ( i ) , Maxt : int64 ( i ) + 1 })
1123
1171
block .tombstones = tomb
1124
1172
1125
1173
db .blocks = append (db .blocks , block )
1126
- expectedBlockDirs = append (expectedBlockDirs , blockDir )
1174
+ oldBlockDirs = append (oldBlockDirs , blockDir )
1127
1175
}
1128
1176
1129
1177
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
@@ -1137,10 +1185,26 @@ func TestTombstoneCleanFail(t *testing.T) {
1137
1185
// The compactor should trigger a failure here.
1138
1186
require .Error (t , db .CleanTombstones ())
1139
1187
1140
- // Now check that the CleanTombstones didn't leave any blocks behind after a failure.
1188
+ // Now check that the CleanTombstones replaced the old block even after a failure.
1141
1189
actualBlockDirs , err := blockDirs (db .dir )
1142
1190
require .NoError (t , err )
1143
- require .Equal (t , expectedBlockDirs , actualBlockDirs )
1191
+ // Only one block should have been replaced by a new block.
1192
+ require .Equal (t , len (oldBlockDirs ), len (actualBlockDirs ))
1193
+ require .Equal (t , len (intersection (oldBlockDirs , actualBlockDirs )), len (actualBlockDirs )- 1 )
1194
+ }
1195
+
1196
+ func intersection (oldBlocks , actualBlocks []string ) (intersection []string ) {
1197
+ hash := make (map [string ]bool )
1198
+ for _ , e := range oldBlocks {
1199
+ hash [e ] = true
1200
+ }
1201
+ for _ , e := range actualBlocks {
1202
+ // If block present in the hashmap then append intersection list.
1203
+ if hash [e ] {
1204
+ intersection = append (intersection , e )
1205
+ }
1206
+ }
1207
+ return
1144
1208
}
1145
1209
1146
1210
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
0 commit comments