@@ -73,16 +73,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7373 }
7474 go ep .Run (ctx )
7575 s := & service {
76- context : ctx ,
77- events : make (chan interface {}, 128 ),
78- ec : reaper .Default .Subscribe (),
79- ep : ep ,
80- shutdown : sd ,
81- containers : make (map [string ]* runc.Container ),
82- running : make (map [int ][]containerProcess ),
83- pendingExecs : make (map [* runc.Container ]int ),
84- execable : make (map [* runc.Container ]bool ),
85- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
76+ context : ctx ,
77+ events : make (chan interface {}, 128 ),
78+ ec : reaper .Default .Subscribe (),
79+ ep : ep ,
80+ shutdown : sd ,
81+ containers : make (map [string ]* runc.Container ),
82+ running : make (map [int ][]containerProcess ),
83+ runningExecs : make (map [* runc.Container ]int ),
84+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
85+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
86+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8687 }
8788 go s .processExits ()
8889 runcC .Monitor = reaper .Default
@@ -117,13 +118,19 @@ type service struct {
117118
118119 lifecycleMu sync.Mutex
119120 running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
120- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
121- // container -> execs can be started, guarded by lifecycleMu.
122- // Execs can be started if the container's init process (read: pid, not [process.Init])
123- // has been started and not yet reaped by the shim.
121+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
122+ // container -> subscription to exec exits/changes to s.runningExecs[container],
123+ // guarded by lifecycleMu
124+ execCountSubscribers map [* runc.Container ]chan <- int
125+ // container -> init exits, guarded by lifecycleMu
126+ // Used to stash container init process exits, so that we can hold them
127+ // until after we've made sure to publish all the container's exec exits.
128+ // Also used to prevent starting new execs from being started if the
129+ // container's init process (read: pid, not [process.Init]) has already been
130+ // reaped by the shim.
124131 // Note that this flag gets updated before the container's [process.Init.Status]
125132 // is transitioned to "stopped".
126- execable map [* runc.Container ]bool
133+ containerInitExit map [* runc.Container ]runcC. Exit
127134 // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
128135 // dereferencing the subscription pointers must only be done while holding
129136 // lifecycleMu.
@@ -144,8 +151,7 @@ type containerProcess struct {
144151//
145152// The returned handleStarted closure records that the process has started so
146153// that its exit can be handled efficiently. If the process has already exited,
147- // it handles the exit immediately. In addition, if the process is an exec and
148- // its container's init process has already exited, that exit is also processed.
154+ // it handles the exit immediately.
149155// handleStarted should be called after the event announcing the start of the
150156// process has been published. Note that s.lifecycleMu must not be held when
151157// calling handleStarted.
@@ -180,44 +186,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
180186 pid = p .Pid ()
181187 }
182188
183- _ , init := p .(* process.Init )
184189 s .lifecycleMu .Lock ()
185190
186- var initExits []runcC.Exit
187- var initCps []containerProcess
188- if ! init {
189- s .pendingExecs [c ]--
190-
191- initPid := c .Pid ()
192- iExits , initExited := exits [initPid ]
193- if initExited && s .pendingExecs [c ] == 0 {
194- // c's init process has exited before handleStarted was called and
195- // this is the last pending exec process start - we need to process
196- // the exit for the init process after processing this exec, so:
197- // - delete c from the s.pendingExecs map
198- // - keep the exits for the init pid to process later (after we process
199- // this exec's exits)
200- // - get the necessary containerProcesses for the init process (that we
201- // need to process the exits), and remove them from s.running (which we skipped
202- // doing in processExits).
203- delete (s .pendingExecs , c )
204- initExits = iExits
205- var skipped []containerProcess
206- for _ , initPidCp := range s .running [initPid ] {
207- if initPidCp .Container == c {
208- initCps = append (initCps , initPidCp )
209- } else {
210- skipped = append (skipped , initPidCp )
211- }
212- }
213- if len (skipped ) == 0 {
214- delete (s .running , initPid )
215- } else {
216- s .running [initPid ] = skipped
217- }
218- }
219- }
220-
221191 ees , exited := exits [pid ]
222192 delete (s .exitSubscribers , & exits )
223193 exits = nil
@@ -226,20 +196,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
226196 for _ , ee := range ees {
227197 s .handleProcessExit (ee , c , p )
228198 }
229- for _ , eee := range initExits {
230- for _ , cp := range initCps {
231- s .handleProcessExit (eee , cp .Container , cp .Process )
232- }
233- }
234199 } else {
235200 // Process start was successful, add to `s.running`.
236201 s .running [pid ] = append (s .running [pid ], containerProcess {
237202 Container : c ,
238203 Process : p ,
239204 })
240- if init {
241- s .execable [c ] = true
242- }
243205 s .lifecycleMu .Unlock ()
244206 }
245207 }
@@ -314,18 +276,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
314276 if r .ExecID == "" {
315277 cinit = container
316278 } else {
317- if ! s . execable [container ] {
279+ if _ , initExited := s . containerInitExit [container ]; initExited {
318280 s .lifecycleMu .Unlock ()
319281 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
320282 }
321- s .pendingExecs [container ]++
283+ s .runningExecs [container ]++
322284 }
323285 handleStarted , cleanup := s .preStart (cinit )
324286 s .lifecycleMu .Unlock ()
325287 defer cleanup ()
326288
327289 p , err := container .Start (ctx , r )
328290 if err != nil {
291+ // If we failed to even start the process, s.runningExecs
292+ // won't get decremented in s.handleProcessExit. We still need
293+ // to update it.
294+ if r .ExecID != "" {
295+ s .lifecycleMu .Lock ()
296+ s .runningExecs [container ]--
297+ if ch , ok := s .execCountSubscribers [container ]; ok {
298+ ch <- s .runningExecs [container ]
299+ }
300+ s .lifecycleMu .Unlock ()
301+ }
329302 handleStarted (container , p )
330303 return nil , errdefs .ToGRPC (err )
331304 }
@@ -690,31 +663,23 @@ func (s *service) processExits() {
690663 // Handle the exit for a created/started process. If there's more than
691664 // one, assume they've all exited. One of them will be the correct
692665 // process.
693- var cps , skipped []containerProcess
666+ var cps []containerProcess
694667 for _ , cp := range s .running [e .Pid ] {
695668 _ , init := cp .Process .(* process.Init )
696669 if init {
697- delete (s .execable , cp .Container )
698- }
699- if init && s .pendingExecs [cp .Container ] != 0 {
700- // This exit relates to a container for which we have pending execs. In
701- // order to ensure order between execs and the init process for a given
702- // container, skip processing this exit here and let the `handleStarted`
703- // closure for the pending exec publish it.
704- skipped = append (skipped , cp )
705- } else {
706- cps = append (cps , cp )
670+ s .containerInitExit [cp .Container ] = e
707671 }
672+ cps = append (cps , cp )
708673 }
709- if len (skipped ) > 0 {
710- s .running [e .Pid ] = skipped
711- } else {
712- delete (s .running , e .Pid )
713- }
674+ delete (s .running , e .Pid )
714675 s .lifecycleMu .Unlock ()
715676
716677 for _ , cp := range cps {
717- s .handleProcessExit (e , cp .Container , cp .Process )
678+ if ip , ok := cp .Process .(* process.Init ); ok {
679+ s .handleInitExit (e , cp .Container , ip )
680+ } else {
681+ s .handleProcessExit (e , cp .Container , cp .Process )
682+ }
718683 }
719684 }
720685}
@@ -723,17 +688,60 @@ func (s *service) send(evt interface{}) {
723688 s .events <- evt
724689}
725690
726- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
727- if ip , ok := p .(* process.Init ); ok {
728- // Ensure all children are killed
729- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
730- if err := ip .KillAll (s .context ); err != nil {
731- logrus .WithError (err ).WithField ("id" , ip .ID ()).
732- Error ("failed to kill init's children" )
733- }
691+ // handleInitExit processes container init process exits.
692+ // This is handled separately from non-init exits, because there
693+ // are some extra invariants we want to ensure in this case, namely:
694+ // - for a given container, the init process exit MUST be the last exit published
695+ // This is achieved by:
696+ // - killing all running container processes (if the container has a shared pid
697+ // namespace, otherwise all other processes have been reaped already).
698+ // - waiting for the container's running exec counter to reach 0.
699+ // - finally, publishing the init exit.
700+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
701+ // kill all running container processes
702+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
703+ if err := p .KillAll (s .context ); err != nil {
704+ logrus .WithError (err ).WithField ("id" , p .ID ()).
705+ Error ("failed to kill init's children" )
734706 }
735707 }
736708
709+ s .lifecycleMu .Lock ()
710+ numRunningExecs := s .runningExecs [c ]
711+ if numRunningExecs == 0 {
712+ delete (s .runningExecs , c )
713+ s .lifecycleMu .Unlock ()
714+ s .handleProcessExit (e , c , p )
715+ return
716+ }
717+
718+ events := make (chan int , numRunningExecs )
719+ s .execCountSubscribers [c ] = events
720+
721+ s .lifecycleMu .Unlock ()
722+
723+ go func () {
724+ defer func () {
725+ s .lifecycleMu .Lock ()
726+ defer s .lifecycleMu .Unlock ()
727+ delete (s .execCountSubscribers , c )
728+ delete (s .runningExecs , c )
729+ }()
730+
731+ // wait for running processes to exit
732+ for {
733+ if runningExecs := <- events ; runningExecs == 0 {
734+ break
735+ }
736+ }
737+
738+ // all running processes have exited now, and no new
739+ // ones can start, so we can publish the init exit
740+ s .handleProcessExit (e , c , p )
741+ }()
742+ }
743+
744+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
737745 p .SetExited (e .Status )
738746 s .send (& eventstypes.TaskExit {
739747 ContainerID : c .ID ,
@@ -742,6 +750,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
742750 ExitStatus : uint32 (e .Status ),
743751 ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
744752 })
753+ if _ , init := p .(* process.Init ); ! init {
754+ s .lifecycleMu .Lock ()
755+ s .runningExecs [c ]--
756+ if ch , ok := s .execCountSubscribers [c ]; ok {
757+ ch <- s .runningExecs [c ]
758+ }
759+ s .lifecycleMu .Unlock ()
760+ }
745761}
746762
747763func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments