@@ -305,70 +305,67 @@ func (n *Manager) nextBatch() []*Alert {
305305func (n * Manager ) Run (tsets <- chan map [string ][]* targetgroup.Group ) {
306306 defer n .cancel ()
307307
308- var drainTimedOut <- chan time.Time
309- draining := false
310-
311- startDraining := func () {
312- if draining {
313- return
314- }
315-
316- drainTimedOut = time .After (n .opts .DrainTimeout )
317- draining = true
318- }
308+ n .runLoop (tsets )
309+ n .drainQueue ()
310+ }
319311
312+ func (n * Manager ) runLoop (tsets <- chan map [string ][]* targetgroup.Group ) {
320313 for {
321314 // The select is split in two parts, such as we will first try to read
322315 // new alertmanager targets if they are available, before sending new
323316 // alerts.
324317 select {
325- case <- drainTimedOut :
326- return
327-
328318 case <- n .stopAndDrainRequested :
329- if n .opts .DrainTimeout == 0 {
330- // No draining requested: just stop immediately.
331- return
332- }
333-
334- startDraining ()
335- continue
319+ return
336320
337321 case ts := <- tsets :
338322 n .reload (ts )
339323
340324 default :
341325 select {
342- case <- drainTimedOut :
343- return
344-
345326 case <- n .stopAndDrainRequested :
346- if n .opts .DrainTimeout == 0 {
347- // No draining requested: just stop immediately.
348- return
349- }
350-
351- startDraining ()
352- continue
327+ return
353328
354329 case ts := <- tsets :
355330 n .reload (ts )
356331
357332 case <- n .more :
358333 }
359334 }
360- alerts := n .nextBatch ()
361335
362- if ! n .sendAll (alerts ... ) {
363- n .metrics .dropped .Add (float64 (len (alerts )))
364- }
336+ n .sendOneBatch ()
365337
366338 // If the queue still has items left, kick off the next iteration.
367339 if n .queueLen () > 0 {
368340 n .setMore ()
369- } else if n .queueLen () == 0 && draining {
370- // Draining complete, nothing more to do.
341+ }
342+ }
343+ }
344+
345+ func (n * Manager ) sendOneBatch () {
346+ alerts := n .nextBatch ()
347+
348+ if ! n .sendAll (alerts ... ) {
349+ n .metrics .dropped .Add (float64 (len (alerts )))
350+ }
351+ }
352+
353+ func (n * Manager ) drainQueue () {
354+ if n .opts .DrainTimeout == 0 {
355+ // Draining disabled, we are done.
356+ return
357+ }
358+
359+ drainTimedOut := time .After (n .opts .DrainTimeout )
360+
361+ // Keep trying to send remaining notifications until we empty the queue or run out of time.
362+ for n .queueLen () > 0 {
363+ select {
364+ case <- drainTimedOut :
371365 return
366+
367+ default :
368+ n .sendOneBatch ()
372369 }
373370 }
374371}
0 commit comments