@@ -72,16 +72,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7272 }
7373 go ep .Run (ctx )
7474 s := & service {
75- context : ctx ,
76- events : make (chan interface {}, 128 ),
77- ec : reaper .Default .Subscribe (),
78- ep : ep ,
79- shutdown : sd ,
80- containers : make (map [string ]* runc.Container ),
81- running : make (map [int ][]containerProcess ),
82- pendingExecs : make (map [* runc.Container ]int ),
83- execable : make (map [* runc.Container ]bool ),
84- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
75+ context : ctx ,
76+ events : make (chan interface {}, 128 ),
77+ ec : reaper .Default .Subscribe (),
78+ ep : ep ,
79+ shutdown : sd ,
80+ containers : make (map [string ]* runc.Container ),
81+ running : make (map [int ][]containerProcess ),
82+ runningExecs : make (map [* runc.Container ]int ),
83+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
84+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
85+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8586 }
8687 go s .processExits ()
8788 runcC .Monitor = reaper .Default
@@ -116,13 +117,19 @@ type service struct {
116117
117118 lifecycleMu sync.Mutex
118119 running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
119- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
120- // container -> execs can be started, guarded by lifecycleMu.
121- // Execs can be started if the container's init process (read: pid, not [process.Init])
122- // has been started and not yet reaped by the shim.
120+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
121+ // container -> subscription to exec exits/changes to s.runningExecs[container],
122+ // guarded by lifecycleMu
123+ execCountSubscribers map [* runc.Container ]chan <- int
124+ // container -> init exits, guarded by lifecycleMu
125+ // Used to stash container init process exits, so that we can hold them
126+ // until after we've made sure to publish all the container's exec exits.
127+ // Also used to prevent starting new execs from being started if the
128+ // container's init process (read: pid, not [process.Init]) has already been
129+ // reaped by the shim.
123130 // Note that this flag gets updated before the container's [process.Init.Status]
124131 // is transitioned to "stopped".
125- execable map [* runc.Container ]bool
132+ containerInitExit map [* runc.Container ]runcC. Exit
126133 // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
127134 // dereferencing the subscription pointers must only be done while holding
128135 // lifecycleMu.
@@ -143,8 +150,7 @@ type containerProcess struct {
143150//
144151// The returned handleStarted closure records that the process has started so
145152// that its exit can be handled efficiently. If the process has already exited,
146- // it handles the exit immediately. In addition, if the process is an exec and
147- // its container's init process has already exited, that exit is also processed.
153+ // it handles the exit immediately.
148154// handleStarted should be called after the event announcing the start of the
149155// process has been published. Note that s.lifecycleMu must not be held when
150156// calling handleStarted.
@@ -179,44 +185,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
179185 pid = p .Pid ()
180186 }
181187
182- _ , init := p .(* process.Init )
183188 s .lifecycleMu .Lock ()
184189
185- var initExits []runcC.Exit
186- var initCps []containerProcess
187- if ! init {
188- s .pendingExecs [c ]--
189-
190- initPid := c .Pid ()
191- iExits , initExited := exits [initPid ]
192- if initExited && s .pendingExecs [c ] == 0 {
193- // c's init process has exited before handleStarted was called and
194- // this is the last pending exec process start - we need to process
195- // the exit for the init process after processing this exec, so:
196- // - delete c from the s.pendingExecs map
197- // - keep the exits for the init pid to process later (after we process
198- // this exec's exits)
199- // - get the necessary containerProcesses for the init process (that we
200- // need to process the exits), and remove them from s.running (which we skipped
201- // doing in processExits).
202- delete (s .pendingExecs , c )
203- initExits = iExits
204- var skipped []containerProcess
205- for _ , initPidCp := range s .running [initPid ] {
206- if initPidCp .Container == c {
207- initCps = append (initCps , initPidCp )
208- } else {
209- skipped = append (skipped , initPidCp )
210- }
211- }
212- if len (skipped ) == 0 {
213- delete (s .running , initPid )
214- } else {
215- s .running [initPid ] = skipped
216- }
217- }
218- }
219-
220190 ees , exited := exits [pid ]
221191 delete (s .exitSubscribers , & exits )
222192 exits = nil
@@ -225,20 +195,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
225195 for _ , ee := range ees {
226196 s .handleProcessExit (ee , c , p )
227197 }
228- for _ , eee := range initExits {
229- for _ , cp := range initCps {
230- s .handleProcessExit (eee , cp .Container , cp .Process )
231- }
232- }
233198 } else {
234199 // Process start was successful, add to `s.running`.
235200 s .running [pid ] = append (s .running [pid ], containerProcess {
236201 Container : c ,
237202 Process : p ,
238203 })
239- if init {
240- s .execable [c ] = true
241- }
242204 s .lifecycleMu .Unlock ()
243205 }
244206 }
@@ -313,18 +275,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
313275 if r .ExecID == "" {
314276 cinit = container
315277 } else {
316- if ! s . execable [container ] {
278+ if _ , initExited := s . containerInitExit [container ]; initExited {
317279 s .lifecycleMu .Unlock ()
318280 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
319281 }
320- s .pendingExecs [container ]++
282+ s .runningExecs [container ]++
321283 }
322284 handleStarted , cleanup := s .preStart (cinit )
323285 s .lifecycleMu .Unlock ()
324286 defer cleanup ()
325287
326288 p , err := container .Start (ctx , r )
327289 if err != nil {
290+ // If we failed to even start the process, s.runningExecs
291+ // won't get decremented in s.handleProcessExit. We still need
292+ // to update it.
293+ if r .ExecID != "" {
294+ s .lifecycleMu .Lock ()
295+ s .runningExecs [container ]--
296+ if ch , ok := s .execCountSubscribers [container ]; ok {
297+ ch <- s .runningExecs [container ]
298+ }
299+ s .lifecycleMu .Unlock ()
300+ }
328301 handleStarted (container , p )
329302 return nil , errdefs .ToGRPC (err )
330303 }
@@ -689,31 +662,23 @@ func (s *service) processExits() {
689662 // Handle the exit for a created/started process. If there's more than
690663 // one, assume they've all exited. One of them will be the correct
691664 // process.
692- var cps , skipped []containerProcess
665+ var cps []containerProcess
693666 for _ , cp := range s .running [e .Pid ] {
694667 _ , init := cp .Process .(* process.Init )
695668 if init {
696- delete (s .execable , cp .Container )
697- }
698- if init && s .pendingExecs [cp .Container ] != 0 {
699- // This exit relates to a container for which we have pending execs. In
700- // order to ensure order between execs and the init process for a given
701- // container, skip processing this exit here and let the `handleStarted`
702- // closure for the pending exec publish it.
703- skipped = append (skipped , cp )
704- } else {
705- cps = append (cps , cp )
669+ s .containerInitExit [cp .Container ] = e
706670 }
671+ cps = append (cps , cp )
707672 }
708- if len (skipped ) > 0 {
709- s .running [e .Pid ] = skipped
710- } else {
711- delete (s .running , e .Pid )
712- }
673+ delete (s .running , e .Pid )
713674 s .lifecycleMu .Unlock ()
714675
715676 for _ , cp := range cps {
716- s .handleProcessExit (e , cp .Container , cp .Process )
677+ if ip , ok := cp .Process .(* process.Init ); ok {
678+ s .handleInitExit (e , cp .Container , ip )
679+ } else {
680+ s .handleProcessExit (e , cp .Container , cp .Process )
681+ }
717682 }
718683 }
719684}
@@ -722,17 +687,60 @@ func (s *service) send(evt interface{}) {
722687 s .events <- evt
723688}
724689
725- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
726- if ip , ok := p .(* process.Init ); ok {
727- // Ensure all children are killed
728- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
729- if err := ip .KillAll (s .context ); err != nil {
730- logrus .WithError (err ).WithField ("id" , ip .ID ()).
731- Error ("failed to kill init's children" )
732- }
690+ // handleInitExit processes container init process exits.
691+ // This is handled separately from non-init exits, because there
692+ // are some extra invariants we want to ensure in this case, namely:
693+ // - for a given container, the init process exit MUST be the last exit published
694+ // This is achieved by:
695+ // - killing all running container processes (if the container has a shared pid
696+ // namespace, otherwise all other processes have been reaped already).
697+ // - waiting for the container's running exec counter to reach 0.
698+ // - finally, publishing the init exit.
699+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
700+ // kill all running container processes
701+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
702+ if err := p .KillAll (s .context ); err != nil {
703+ logrus .WithError (err ).WithField ("id" , p .ID ()).
704+ Error ("failed to kill init's children" )
733705 }
734706 }
735707
708+ s .lifecycleMu .Lock ()
709+ numRunningExecs := s .runningExecs [c ]
710+ if numRunningExecs == 0 {
711+ delete (s .runningExecs , c )
712+ s .lifecycleMu .Unlock ()
713+ s .handleProcessExit (e , c , p )
714+ return
715+ }
716+
717+ events := make (chan int , numRunningExecs )
718+ s .execCountSubscribers [c ] = events
719+
720+ s .lifecycleMu .Unlock ()
721+
722+ go func () {
723+ defer func () {
724+ s .lifecycleMu .Lock ()
725+ defer s .lifecycleMu .Unlock ()
726+ delete (s .execCountSubscribers , c )
727+ delete (s .runningExecs , c )
728+ }()
729+
730+ // wait for running processes to exit
731+ for {
732+ if runningExecs := <- events ; runningExecs == 0 {
733+ break
734+ }
735+ }
736+
737+ // all running processes have exited now, and no new
738+ // ones can start, so we can publish the init exit
739+ s .handleProcessExit (e , c , p )
740+ }()
741+ }
742+
743+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
736744 p .SetExited (e .Status )
737745 s .send (& eventstypes.TaskExit {
738746 ContainerID : c .ID ,
@@ -741,6 +749,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
741749 ExitStatus : uint32 (e .Status ),
742750 ExitedAt : p .ExitedAt (),
743751 })
752+ if _ , init := p .(* process.Init ); ! init {
753+ s .lifecycleMu .Lock ()
754+ s .runningExecs [c ]--
755+ if ch , ok := s .execCountSubscribers [c ]; ok {
756+ ch <- s .runningExecs [c ]
757+ }
758+ s .lifecycleMu .Unlock ()
759+ }
744760}
745761
746762func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments