@@ -2,7 +2,6 @@ package deallocator
22
33import (
44 "context"
5- "sync"
65
76 "github.com/docker/go-events"
87 "github.com/docker/swarmkit/api"
@@ -33,9 +32,6 @@ import (
3332type Deallocator struct {
3433 store * store.MemoryStore
3534
36- // closeOnce ensures that stopChan is closed only once
37- closeOnce sync.Once
38-
3935 // for services that are shutting down, we keep track of how many
4036 // tasks still exist for them
4137 services map [string ]* serviceWithTaskCounts
@@ -98,7 +94,6 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
9894
9995 return
10096 },
101- api.EventUpdateTask {},
10297 api.EventDeleteTask {},
10398 api.EventUpdateService {},
10499 api.EventUpdateNetwork {})
@@ -134,7 +129,7 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
134129 for {
135130 select {
136131 case event := <- eventsChan :
137- if updated , err := deallocator .handleEvent (ctx , event ); err == nil {
132+ if updated , err := deallocator .processNewEvent (ctx , event ); err == nil {
138133 deallocator .notifyEventChan (updated )
139134 } else {
140135 log .G (ctx ).WithError (err ).Errorf ("error processing deallocator event %#v" , event )
@@ -147,16 +142,11 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
147142 }
148143}
149144
150- // Stop stops the deallocator's routine and wait for the main loop to exit
151- // Stop can be called in two cases. One when the manager is
152- // shutting down, and the other when the manager (the leader) is
153- // becoming a follower. Since these two instances could race with
154- // each other, we use closeOnce here to ensure that TaskReaper.Stop()
155- // is called only once to avoid a panic.
145+ // Stop stops the deallocator's routine
146+ // FIXME (jrouge): see the comment on TaskReaper.Stop() and see when to properly stop this
147+ // plus unit test on this!
156148func (deallocator * Deallocator ) Stop () {
157- deallocator .closeOnce .Do (func () {
158- close (deallocator .stopChan )
159- })
149+ close (deallocator .stopChan )
160150 <- deallocator .doneChan
161151}
162152
@@ -190,21 +180,11 @@ func (deallocator *Deallocator) processService(ctx context.Context, service *api
190180 // better to clean up resources that shouldn't be cleaned up yet
191181 // than ending up with a service and some resources lost in limbo forever
192182 return true , deallocator .deallocateService (ctx , service )
193- }
194-
195- remainingTasks := 0
196- for _ , task := range tasks {
197- if isTaskStillAlive (task ) {
198- remainingTasks ++
199- }
200- }
201-
202- if remainingTasks == 0 {
183+ } else if len (tasks ) == 0 {
203184 // no tasks remaining for this service, we can clean it up
204185 return true , deallocator .deallocateService (ctx , service )
205186 }
206-
207- deallocator .services [service .ID ] = & serviceWithTaskCounts {service : service , taskCount : remainingTasks }
187+ deallocator .services [service .ID ] = & serviceWithTaskCounts {service : service , taskCount : len (tasks )}
208188 return false , nil
209189}
210190
@@ -283,16 +263,24 @@ func (deallocator *Deallocator) processNetwork(ctx context.Context, tx store.Tx,
283263 return
284264}
285265
286- // Handles new events, and dispatches to the right method depending on what
266+ // Processes new events, and dispatches to the right method depending on what
287267// type of event it is.
288268// The boolean part of the return tuple indicates whether anything was actually
289269// removed from the store
290- func (deallocator * Deallocator ) handleEvent (ctx context.Context , event events.Event ) (bool , error ) {
270+ func (deallocator * Deallocator ) processNewEvent (ctx context.Context , event events.Event ) (bool , error ) {
291271 switch typedEvent := event .(type ) {
292- case api.EventUpdateTask :
293- return deallocator .processTaskEvent (ctx , typedEvent .Task , typedEvent .OldTask )
294272 case api.EventDeleteTask :
295- return deallocator .processTaskEvent (ctx , nil , typedEvent .Task )
273+ serviceID := typedEvent .Task .ServiceID
274+
275+ if serviceWithCount , present := deallocator .services [serviceID ]; present {
276+ if serviceWithCount .taskCount <= 1 {
277+ delete (deallocator .services , serviceID )
278+ return deallocator .processService (ctx , serviceWithCount .service )
279+ }
280+ serviceWithCount .taskCount --
281+ }
282+
283+ return false , nil
296284 case api.EventUpdateService :
297285 return deallocator .processService (ctx , typedEvent .Service )
298286 case api.EventUpdateNetwork :
@@ -301,32 +289,3 @@ func (deallocator *Deallocator) handleEvent(ctx context.Context, event events.Ev
301289 return false , nil
302290 }
303291}
304-
305- // Common logic for handling task update/delete events
306- // oldTask is the task object as it was before its update or deletion
307- // newTask is nil for delete events, and the new object for updates
308- func (deallocator * Deallocator ) processTaskEvent (ctx context.Context , newTask , oldTask * api.Task ) (bool , error ) {
309- serviceID := oldTask .ServiceID
310- serviceWithCount , present := deallocator .services [serviceID ]
311-
312- if present && isTaskStillAlive (oldTask ) && (newTask == nil || ! isTaskStillAlive (newTask )) {
313- // this task belongs to a service that's shutting down, and in addition,
314- // prior to its update or deletion it was still alive, but now it's
315- // not alive any more, so we decrement the counter of alive tasks for
316- // this service
317-
318- if serviceWithCount .taskCount <= 1 {
319- delete (deallocator .services , serviceID )
320- return deallocator .processService (ctx , serviceWithCount .service )
321- }
322- serviceWithCount .taskCount --
323- }
324-
325- return false , nil
326- }
327-
328- // simple helper function to distinguish tasks that are still running
329- // from ones that are done
330- func isTaskStillAlive (task * api.Task ) bool {
331- return task .Status .State <= api .TaskStateRunning
332- }
0 commit comments