Skip to content

Commit e4f5fee

Browse files
committed
Address PR feedback
Signed-off-by: Charles Korn <[email protected]>
1 parent ed56b29 commit e4f5fee

File tree

1 file changed

+34
-37
lines changed

1 file changed

+34
-37
lines changed

notifier/notifier.go

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -305,70 +305,67 @@ func (n *Manager) nextBatch() []*Alert {
305305
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
306306
defer n.cancel()
307307

308-
var drainTimedOut <-chan time.Time
309-
draining := false
310-
311-
startDraining := func() {
312-
if draining {
313-
return
314-
}
315-
316-
drainTimedOut = time.After(n.opts.DrainTimeout)
317-
draining = true
318-
}
308+
n.runLoop(tsets)
309+
n.drainQueue()
310+
}
319311

312+
func (n *Manager) runLoop(tsets <-chan map[string][]*targetgroup.Group) {
320313
for {
321314
// The select is split in two parts, such as we will first try to read
322315
// new alertmanager targets if they are available, before sending new
323316
// alerts.
324317
select {
325-
case <-drainTimedOut:
326-
return
327-
328318
case <-n.stopAndDrainRequested:
329-
if n.opts.DrainTimeout == 0 {
330-
// No draining requested: just stop immediately.
331-
return
332-
}
333-
334-
startDraining()
335-
continue
319+
return
336320

337321
case ts := <-tsets:
338322
n.reload(ts)
339323

340324
default:
341325
select {
342-
case <-drainTimedOut:
343-
return
344-
345326
case <-n.stopAndDrainRequested:
346-
if n.opts.DrainTimeout == 0 {
347-
// No draining requested: just stop immediately.
348-
return
349-
}
350-
351-
startDraining()
352-
continue
327+
return
353328

354329
case ts := <-tsets:
355330
n.reload(ts)
356331

357332
case <-n.more:
358333
}
359334
}
360-
alerts := n.nextBatch()
361335

362-
if !n.sendAll(alerts...) {
363-
n.metrics.dropped.Add(float64(len(alerts)))
364-
}
336+
n.sendOneBatch()
365337

366338
// If the queue still has items left, kick off the next iteration.
367339
if n.queueLen() > 0 {
368340
n.setMore()
369-
} else if n.queueLen() == 0 && draining {
370-
// Draining complete, nothing more to do.
341+
}
342+
}
343+
}
344+
345+
func (n *Manager) sendOneBatch() {
346+
alerts := n.nextBatch()
347+
348+
if !n.sendAll(alerts...) {
349+
n.metrics.dropped.Add(float64(len(alerts)))
350+
}
351+
}
352+
353+
func (n *Manager) drainQueue() {
354+
if n.opts.DrainTimeout == 0 {
355+
// Draining disabled, we are done.
356+
return
357+
}
358+
359+
drainTimedOut := time.After(n.opts.DrainTimeout)
360+
361+
// Keep trying to send remaining notifications until we empty the queue or run out of time.
362+
for n.queueLen() > 0 {
363+
select {
364+
case <-drainTimedOut:
371365
return
366+
367+
default:
368+
n.sendOneBatch()
372369
}
373370
}
374371
}

0 commit comments

Comments
 (0)