Skip to content

Commit 9e0351e

Browse files
committed
CleanupTombstones refactored, now reloading blocks after every compaction.
The goal is to remove deletable blocks after every compaction and, thus, decrease disk space used when cleaning tombstones. Signed-off-by: arthursens <[email protected]>
1 parent 9b85ed7 commit 9e0351e

File tree

3 files changed

+108
-35
lines changed

3 files changed

+108
-35
lines changed

tsdb/block.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,9 @@ Outer:
569569

570570
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
571571
// If there was a rewrite, then it returns the ULID of the new block written, else nil.
572-
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
572+
// If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID.
573+
// It returns a boolean indicating if the parent block can be deleted safely of not.
574+
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error) {
573575
numStones := 0
574576

575577
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
@@ -580,15 +582,16 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
580582
panic(err)
581583
}
582584
if numStones == 0 {
583-
return nil, nil
585+
return nil, false, nil
584586
}
585587

586588
meta := pb.Meta()
587589
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
588590
if err != nil {
589-
return nil, err
591+
return nil, false, err
590592
}
591-
return &uid, nil
593+
594+
return &uid, true, nil
592595
}
593596

594597
// Snapshot creates snapshot of the block into dir.

tsdb/db.go

+28-22
Original file line numberDiff line numberDiff line change
@@ -1537,34 +1537,40 @@ func (db *DB) CleanTombstones() (err error) {
15371537
start := time.Now()
15381538
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
15391539

1540-
newUIDs := []ulid.ULID{}
1541-
defer func() {
1542-
// If any error is caused, we need to delete all the new directory created.
1543-
if err != nil {
1544-
for _, uid := range newUIDs {
1540+
cleanUpCompleted := false
1541+
// Repeat cleanup until there is no tombstones left.
1542+
for !cleanUpCompleted {
1543+
cleanUpCompleted = true
1544+
1545+
for _, pb := range db.Blocks() {
1546+
uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor)
1547+
if cleanErr != nil {
1548+
return errors.Wrapf(cleanErr, "clean tombstones: %s", pb.Dir())
1549+
}
1550+
if !safeToDelete {
1551+
// There was nothing to clean.
1552+
continue
1553+
}
1554+
1555+
// In case tombstones of the old block covers the whole block,
1556+
// then there would be no resultant block to tell the parent.
1557+
pb.meta.Compaction.Deletable = safeToDelete
1558+
cleanUpCompleted = false
1559+
if err = db.reloadBlocks(); err == nil { // Will try to delete old block.
1560+
// Successful reload will change the existing blocks.
1561+
// We need to loop over the new set of blocks.
1562+
break
1563+
}
1564+
1565+
// Delete new block if it was created.
1566+
if uid != nil {
15451567
dir := filepath.Join(db.Dir(), uid.String())
15461568
if err := os.RemoveAll(dir); err != nil {
15471569
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
15481570
}
15491571
}
1572+
return errors.Wrap(err, "reload blocks")
15501573
}
1551-
}()
1552-
1553-
db.mtx.RLock()
1554-
blocks := db.blocks[:]
1555-
db.mtx.RUnlock()
1556-
1557-
for _, b := range blocks {
1558-
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
1559-
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
1560-
return err
1561-
} else if uid != nil { // New block was created.
1562-
newUIDs = append(newUIDs, *uid)
1563-
}
1564-
}
1565-
1566-
if err := db.reloadBlocks(); err != nil {
1567-
return errors.Wrap(err, "reload blocks")
15681574
}
15691575
return nil
15701576
}

tsdb/db_test.go

+73-9
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ func TestTombstoneClean(t *testing.T) {
10301030
for _, c := range cases {
10311031
// Delete the ranges.
10321032

1033-
// create snapshot
1033+
// Create snapshot.
10341034
snap, err := ioutil.TempDir("", "snap")
10351035
require.NoError(t, err)
10361036

@@ -1040,7 +1040,7 @@ func TestTombstoneClean(t *testing.T) {
10401040
require.NoError(t, db.Snapshot(snap, true))
10411041
require.NoError(t, db.Close())
10421042

1043-
// reopen DB from snapshot
1043+
// Reopen DB from snapshot.
10441044
db, err = Open(snap, nil, nil, nil)
10451045
require.NoError(t, err)
10461046
defer db.Close()
@@ -1099,6 +1099,54 @@ func TestTombstoneClean(t *testing.T) {
10991099
}
11001100
}
11011101

1102+
// TestTombstoneCleanResultEmptyBlock tests that a TombstoneClean that results in empty blocks (no timeseries)
1103+
// will also delete the resultant block.
1104+
func TestTombstoneCleanResultEmptyBlock(t *testing.T) {
1105+
numSamples := int64(10)
1106+
1107+
db := openTestDB(t, nil, nil)
1108+
1109+
ctx := context.Background()
1110+
app := db.Appender(ctx)
1111+
1112+
smpls := make([]float64, numSamples)
1113+
for i := int64(0); i < numSamples; i++ {
1114+
smpls[i] = rand.Float64()
1115+
app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
1116+
}
1117+
1118+
require.NoError(t, app.Commit())
1119+
// Interval should cover the whole block.
1120+
intervals := tombstones.Intervals{{Mint: 0, Maxt: numSamples}}
1121+
1122+
// Create snapshot.
1123+
snap, err := ioutil.TempDir("", "snap")
1124+
require.NoError(t, err)
1125+
1126+
defer func() {
1127+
require.NoError(t, os.RemoveAll(snap))
1128+
}()
1129+
require.NoError(t, db.Snapshot(snap, true))
1130+
require.NoError(t, db.Close())
1131+
1132+
// Reopen DB from snapshot.
1133+
db, err = Open(snap, nil, nil, nil)
1134+
require.NoError(t, err)
1135+
defer db.Close()
1136+
1137+
// Create tombstones by deleting all samples.
1138+
for _, r := range intervals {
1139+
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
1140+
}
1141+
1142+
require.NoError(t, db.CleanTombstones())
1143+
1144+
// After cleaning tombstones that covers the entire block, no blocks should be left behind.
1145+
actualBlockDirs, err := blockDirs(db.dir)
1146+
require.NoError(t, err)
1147+
require.Equal(t, 0, len(actualBlockDirs))
1148+
}
1149+
11021150
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
11031151
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
11041152
// if TombstoneClean leaves any blocks behind these will overlap.
@@ -1108,22 +1156,22 @@ func TestTombstoneCleanFail(t *testing.T) {
11081156
require.NoError(t, db.Close())
11091157
}()
11101158

1111-
var expectedBlockDirs []string
1159+
var oldBlockDirs []string
11121160

1113-
// Create some empty blocks pending for compaction.
1161+
// Create some blocks pending for compaction.
11141162
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
11151163
totalBlocks := 2
11161164
for i := 0; i < totalBlocks; i++ {
1117-
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
1165+
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1))
11181166
block, err := OpenBlock(nil, blockDir, nil)
11191167
require.NoError(t, err)
11201168
// Add some fake tombstones to trigger the compaction.
11211169
tomb := tombstones.NewMemTombstones()
1122-
tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1})
1170+
tomb.AddInterval(0, tombstones.Interval{Mint: int64(i), Maxt: int64(i) + 1})
11231171
block.tombstones = tomb
11241172

11251173
db.blocks = append(db.blocks, block)
1126-
expectedBlockDirs = append(expectedBlockDirs, blockDir)
1174+
oldBlockDirs = append(oldBlockDirs, blockDir)
11271175
}
11281176

11291177
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
@@ -1137,10 +1185,26 @@ func TestTombstoneCleanFail(t *testing.T) {
11371185
// The compactor should trigger a failure here.
11381186
require.Error(t, db.CleanTombstones())
11391187

1140-
// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
1188+
// Now check that the CleanTombstones replaced the old block even after a failure.
11411189
actualBlockDirs, err := blockDirs(db.dir)
11421190
require.NoError(t, err)
1143-
require.Equal(t, expectedBlockDirs, actualBlockDirs)
1191+
// Only one block should have been replaced by a new block.
1192+
require.Equal(t, len(oldBlockDirs), len(actualBlockDirs))
1193+
require.Equal(t, len(intersection(oldBlockDirs, actualBlockDirs)), len(actualBlockDirs)-1)
1194+
}
1195+
1196+
func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
1197+
hash := make(map[string]bool)
1198+
for _, e := range oldBlocks {
1199+
hash[e] = true
1200+
}
1201+
for _, e := range actualBlocks {
1202+
// If block present in the hashmap then append intersection list.
1203+
if hash[e] {
1204+
intersection = append(intersection, e)
1205+
}
1206+
}
1207+
return
11441208
}
11451209

11461210
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.

0 commit comments

Comments
 (0)