@@ -74,16 +74,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7474 }
7575 go ep .Run (ctx )
7676 s := & service {
77- context : ctx ,
78- events : make (chan interface {}, 128 ),
79- ec : reaper .Default .Subscribe (),
80- ep : ep ,
81- shutdown : sd ,
82- containers : make (map [string ]* runc.Container ),
83- running : make (map [int ][]containerProcess ),
84- pendingExecs : make (map [* runc.Container ]int ),
85- execable : make (map [* runc.Container ]bool ),
86- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
77+ context : ctx ,
78+ events : make (chan interface {}, 128 ),
79+ ec : reaper .Default .Subscribe (),
80+ ep : ep ,
81+ shutdown : sd ,
82+ containers : make (map [string ]* runc.Container ),
83+ running : make (map [int ][]containerProcess ),
84+ runningExecs : make (map [* runc.Container ]int ),
85+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
86+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
87+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8788 }
8889 go s .processExits ()
8990 runcC .Monitor = reaper .Default
@@ -118,13 +119,19 @@ type service struct {
118119
119120 lifecycleMu sync.Mutex
120121 running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
121- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
122- // container -> execs can be started, guarded by lifecycleMu.
123- // Execs can be started if the container's init process (read: pid, not [process.Init])
124- // has been started and not yet reaped by the shim.
122+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
123+ // container -> subscription to exec exits/changes to s.runningExecs[container],
124+ // guarded by lifecycleMu
125+ execCountSubscribers map [* runc.Container ]chan <- int
126+ // container -> init exits, guarded by lifecycleMu
127+ // Used to stash container init process exits, so that we can hold them
128+ // until after we've made sure to publish all the container's exec exits.
129+ // Also used to prevent starting new execs from being started if the
130+ // container's init process (read: pid, not [process.Init]) has already been
131+ // reaped by the shim.
125132 // Note that this flag gets updated before the container's [process.Init.Status]
126133 // is transitioned to "stopped".
127- execable map [* runc.Container ]bool
134+ containerInitExit map [* runc.Container ]runcC. Exit
128135 // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
129136 // dereferencing the subscription pointers must only be done while holding
130137 // lifecycleMu.
@@ -145,8 +152,7 @@ type containerProcess struct {
145152//
146153// The returned handleStarted closure records that the process has started so
147154// that its exit can be handled efficiently. If the process has already exited,
148- // it handles the exit immediately. In addition, if the process is an exec and
149- // its container's init process has already exited, that exit is also processed.
155+ // it handles the exit immediately.
150156// handleStarted should be called after the event announcing the start of the
151157// process has been published. Note that s.lifecycleMu must not be held when
152158// calling handleStarted.
@@ -181,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
181187 pid = p .Pid ()
182188 }
183189
184- _ , init := p .(* process.Init )
185190 s .lifecycleMu .Lock ()
186191
187- var initExits []runcC.Exit
188- var initCps []containerProcess
189- if ! init {
190- s .pendingExecs [c ]--
191-
192- initPid := c .Pid ()
193- iExits , initExited := exits [initPid ]
194- if initExited && s .pendingExecs [c ] == 0 {
195- // c's init process has exited before handleStarted was called and
196- // this is the last pending exec process start - we need to process
197- // the exit for the init process after processing this exec, so:
198- // - delete c from the s.pendingExecs map
199- // - keep the exits for the init pid to process later (after we process
200- // this exec's exits)
201- // - get the necessary containerProcesses for the init process (that we
202- // need to process the exits), and remove them from s.running (which we skipped
203- // doing in processExits).
204- delete (s .pendingExecs , c )
205- initExits = iExits
206- var skipped []containerProcess
207- for _ , initPidCp := range s .running [initPid ] {
208- if initPidCp .Container == c {
209- initCps = append (initCps , initPidCp )
210- } else {
211- skipped = append (skipped , initPidCp )
212- }
213- }
214- if len (skipped ) == 0 {
215- delete (s .running , initPid )
216- } else {
217- s .running [initPid ] = skipped
218- }
219- }
220- }
221-
222192 ees , exited := exits [pid ]
223193 delete (s .exitSubscribers , & exits )
224194 exits = nil
@@ -227,20 +197,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
227197 for _ , ee := range ees {
228198 s .handleProcessExit (ee , c , p )
229199 }
230- for _ , eee := range initExits {
231- for _ , cp := range initCps {
232- s .handleProcessExit (eee , cp .Container , cp .Process )
233- }
234- }
235200 } else {
236201 // Process start was successful, add to `s.running`.
237202 s .running [pid ] = append (s .running [pid ], containerProcess {
238203 Container : c ,
239204 Process : p ,
240205 })
241- if init {
242- s .execable [c ] = true
243- }
244206 s .lifecycleMu .Unlock ()
245207 }
246208 }
@@ -315,18 +277,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
315277 if r .ExecID == "" {
316278 cinit = container
317279 } else {
318- if ! s . execable [container ] {
280+ if _ , initExited := s . containerInitExit [container ]; initExited {
319281 s .lifecycleMu .Unlock ()
320282 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
321283 }
322- s .pendingExecs [container ]++
284+ s .runningExecs [container ]++
323285 }
324286 handleStarted , cleanup := s .preStart (cinit )
325287 s .lifecycleMu .Unlock ()
326288 defer cleanup ()
327289
328290 p , err := container .Start (ctx , r )
329291 if err != nil {
292+ // If we failed to even start the process, s.runningExecs
293+ // won't get decremented in s.handleProcessExit. We still need
294+ // to update it.
295+ if r .ExecID != "" {
296+ s .lifecycleMu .Lock ()
297+ s .runningExecs [container ]--
298+ if ch , ok := s .execCountSubscribers [container ]; ok {
299+ ch <- s .runningExecs [container ]
300+ }
301+ s .lifecycleMu .Unlock ()
302+ }
330303 handleStarted (container , p )
331304 return nil , errdefs .ToGRPC (err )
332305 }
@@ -691,31 +664,23 @@ func (s *service) processExits() {
691664 // Handle the exit for a created/started process. If there's more than
692665 // one, assume they've all exited. One of them will be the correct
693666 // process.
694- var cps , skipped []containerProcess
667+ var cps []containerProcess
695668 for _ , cp := range s .running [e .Pid ] {
696669 _ , init := cp .Process .(* process.Init )
697670 if init {
698- delete (s .execable , cp .Container )
699- }
700- if init && s .pendingExecs [cp .Container ] != 0 {
701- // This exit relates to a container for which we have pending execs. In
702- // order to ensure order between execs and the init process for a given
703- // container, skip processing this exit here and let the `handleStarted`
704- // closure for the pending exec publish it.
705- skipped = append (skipped , cp )
706- } else {
707- cps = append (cps , cp )
671+ s .containerInitExit [cp .Container ] = e
708672 }
673+ cps = append (cps , cp )
709674 }
710- if len (skipped ) > 0 {
711- s .running [e .Pid ] = skipped
712- } else {
713- delete (s .running , e .Pid )
714- }
675+ delete (s .running , e .Pid )
715676 s .lifecycleMu .Unlock ()
716677
717678 for _ , cp := range cps {
718- s .handleProcessExit (e , cp .Container , cp .Process )
679+ if ip , ok := cp .Process .(* process.Init ); ok {
680+ s .handleInitExit (e , cp .Container , ip )
681+ } else {
682+ s .handleProcessExit (e , cp .Container , cp .Process )
683+ }
719684 }
720685 }
721686}
@@ -724,17 +689,60 @@ func (s *service) send(evt interface{}) {
724689 s .events <- evt
725690}
726691
727- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
728- if ip , ok := p .(* process.Init ); ok {
729- // Ensure all children are killed
730- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
731- if err := ip .KillAll (s .context ); err != nil {
732- log .G (s .context ).WithError (err ).WithField ("id" , ip .ID ()).
733- Error ("failed to kill init's children" )
734- }
692+ // handleInitExit processes container init process exits.
693+ // This is handled separately from non-init exits, because there
694+ // are some extra invariants we want to ensure in this case, namely:
695+ // - for a given container, the init process exit MUST be the last exit published
696+ // This is achieved by:
697+ // - killing all running container processes (if the container has a shared pid
698+ // namespace, otherwise all other processes have been reaped already).
699+ // - waiting for the container's running exec counter to reach 0.
700+ // - finally, publishing the init exit.
701+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
702+ // kill all running container processes
703+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
704+ if err := p .KillAll (s .context ); err != nil {
705+ log .G (s .context ).WithError (err ).WithField ("id" , p .ID ()).
706+ Error ("failed to kill init's children" )
735707 }
736708 }
737709
710+ s .lifecycleMu .Lock ()
711+ numRunningExecs := s .runningExecs [c ]
712+ if numRunningExecs == 0 {
713+ delete (s .runningExecs , c )
714+ s .lifecycleMu .Unlock ()
715+ s .handleProcessExit (e , c , p )
716+ return
717+ }
718+
719+ events := make (chan int , numRunningExecs )
720+ s .execCountSubscribers [c ] = events
721+
722+ s .lifecycleMu .Unlock ()
723+
724+ go func () {
725+ defer func () {
726+ s .lifecycleMu .Lock ()
727+ defer s .lifecycleMu .Unlock ()
728+ delete (s .execCountSubscribers , c )
729+ delete (s .runningExecs , c )
730+ }()
731+
732+ // wait for running processes to exit
733+ for {
734+ if runningExecs := <- events ; runningExecs == 0 {
735+ break
736+ }
737+ }
738+
739+ // all running processes have exited now, and no new
740+ // ones can start, so we can publish the init exit
741+ s .handleProcessExit (e , c , p )
742+ }()
743+ }
744+
745+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
738746 p .SetExited (e .Status )
739747 s .send (& eventstypes.TaskExit {
740748 ContainerID : c .ID ,
@@ -743,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
743751 ExitStatus : uint32 (e .Status ),
744752 ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
745753 })
754+ if _ , init := p .(* process.Init ); ! init {
755+ s .lifecycleMu .Lock ()
756+ s .runningExecs [c ]--
757+ if ch , ok := s .execCountSubscribers [c ]; ok {
758+ ch <- s .runningExecs [c ]
759+ }
760+ s .lifecycleMu .Unlock ()
761+ }
746762}
747763
748764func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments