Skip to content

Commit d44ea7e

Browse files
committed
Extract unlockWaitAndLockAgain
Signed-off-by: Oleg Zaytsev <[email protected]>
1 parent c3e34b2 commit d44ea7e

File tree

1 file changed

+20
-19
lines changed

1 file changed

+20
-19
lines changed

tsdb/index/postings.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -343,19 +343,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
343343
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
344344
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
345345
if i%512 == 0 {
346-
p.mtx.Unlock()
347-
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
348-
// it wouldn't ensure use that readers actually were able to get the read lock,
349-
// because if there are writes waiting on same mutex, readers won't be able to get it.
350-
// So we just grab one RLock ourselves.
351-
p.mtx.RLock()
352-
// We shouldn't wait here, because we would be blocking a potential write for no reason.
353-
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
354-
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
355-
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
356-
// If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible.
357-
time.Sleep(time.Millisecond)
358-
p.mtx.Lock()
346+
p.unlockWaitAndLockAgain()
359347
}
360348
}
361349
process(allPostingsKey)
@@ -364,13 +352,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
364352
i = 0
365353
for name := range affectedLabelNames {
366354
i++
367-
// Same mutex pause as above.
355+
// From time to time we want some readers to go through and read their postings.
368356
if i%512 == 0 {
369-
p.mtx.Unlock()
370-
p.mtx.RLock()
371-
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
372-
time.Sleep(time.Millisecond)
373-
p.mtx.Lock()
357+
p.unlockWaitAndLockAgain()
374358
}
375359

376360
if len(p.m[name]) == 0 {
@@ -390,6 +374,23 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
390374
}
391375
}
392376

377+
// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again,
378+
// letting the RLock()-waiting goroutines to get the lock.
379+
func (p *MemPostings) unlockWaitAndLockAgain() {
380+
p.mtx.Unlock()
381+
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
382+
// it wouldn't ensure use that readers actually were able to get the read lock,
383+
// because if there are writes waiting on same mutex, readers won't be able to get it.
384+
// So we just grab one RLock ourselves.
385+
p.mtx.RLock()
386+
// We shouldn't wait here, because we would be blocking a potential write for no reason.
387+
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
388+
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
389+
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
390+
time.Sleep(time.Millisecond)
391+
p.mtx.Lock()
392+
}
393+
393394
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
394395
func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
395396
p.mtx.RLock()

0 commit comments

Comments
 (0)