@@ -343,19 +343,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
343
343
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
344
344
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
345
345
if i % 512 == 0 {
346
- p .mtx .Unlock ()
347
- // While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
348
- // it wouldn't ensure use that readers actually were able to get the read lock,
349
- // because if there are writes waiting on same mutex, readers won't be able to get it.
350
- // So we just grab one RLock ourselves.
351
- p .mtx .RLock ()
352
- // We shouldn't wait here, because we would be blocking a potential write for no reason.
353
- // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
354
- p .mtx .RUnlock () //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
355
- // Now we can wait a little bit just to increase the chance of a reader getting the lock.
356
- // If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible.
357
- time .Sleep (time .Millisecond )
358
- p .mtx .Lock ()
346
+ p .unlockWaitAndLockAgain ()
359
347
}
360
348
}
361
349
process (allPostingsKey )
@@ -364,13 +352,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
364
352
i = 0
365
353
for name := range affectedLabelNames {
366
354
i ++
367
- // Same mutex pause as above .
355
+ // From time to time we want some readers to go through and read their postings .
368
356
if i % 512 == 0 {
369
- p .mtx .Unlock ()
370
- p .mtx .RLock ()
371
- p .mtx .RUnlock () //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
372
- time .Sleep (time .Millisecond )
373
- p .mtx .Lock ()
357
+ p .unlockWaitAndLockAgain ()
374
358
}
375
359
376
360
if len (p .m [name ]) == 0 {
@@ -390,6 +374,23 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
390
374
}
391
375
}
392
376
377
+ // unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again,
378
+ // letting the RLock()-waiting goroutines to get the lock.
379
+ func (p * MemPostings ) unlockWaitAndLockAgain () {
380
+ p .mtx .Unlock ()
381
+ // While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
382
+ // it wouldn't ensure use that readers actually were able to get the read lock,
383
+ // because if there are writes waiting on same mutex, readers won't be able to get it.
384
+ // So we just grab one RLock ourselves.
385
+ p .mtx .RLock ()
386
+ // We shouldn't wait here, because we would be blocking a potential write for no reason.
387
+ // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
388
+ p .mtx .RUnlock () //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
389
+ // Now we can wait a little bit just to increase the chance of a reader getting the lock.
390
+ time .Sleep (time .Millisecond )
391
+ p .mtx .Lock ()
392
+ }
393
+
393
394
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
394
395
func (p * MemPostings ) Iter (f func (labels.Label , Postings ) error ) error {
395
396
p .mtx .RLock ()
0 commit comments