@@ -79,6 +79,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7979 shutdown : sd ,
8080 containers : make (map [string ]* runc.Container ),
8181 running : make (map [int ][]containerProcess ),
82+ pendingExecs : make (map [* runc.Container ]int ),
8283 exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8384 }
8485 go s .processExits ()
@@ -112,8 +113,9 @@ type service struct {
112113
113114 containers map [string ]* runc.Container
114115
115- lifecycleMu sync.Mutex
116- running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
116+ lifecycleMu sync.Mutex
117+ running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
118+ pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
117119 // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
118120 // dereferencing the subscription pointers must only be done while holding
119121 // lifecycleMu.
@@ -128,26 +130,23 @@ type containerProcess struct {
128130}
129131
130132// preStart prepares for starting a container process and handling its exit.
131- // The container being started should be passed in as c when starting the
132- // container init process for an already-created container. c should be nil when
133- // creating a container or when starting an exec.
133+ // The container being started should be passed in as c when starting the container
134+ // init process for an already-created container. c should be nil when creating a
135+ // container or when starting an exec.
134136//
135137// The returned handleStarted closure records that the process has started so
136138// that its exit can be handled efficiently. If the process has already exited,
137- // it handles the exit immediately. handleStarted should be called after the
138- // event announcing the start of the process has been published .
139- // Note that handleStarted needs to be aware of whether s.mu is already held
140- // when it is called. If s.mu has been held, we don't need to lock it when
141- // calling handleProcessExit .
139+ // it handles the exit immediately. In addition, if the process is an exec and
140+ // its container's init process has already exited, that exit is also processed .
141+ // handleStarted should be called after the event announcing the start of the
142+ // process has been published. Note that s.lifecycleMu must not be held when
143+ // calling handleStarted .
142144//
143145// The returned cleanup closure releases resources used to handle early exits.
144146// It must be called before the caller of preStart returns, otherwise severe
145147// memory leaks will occur.
146- func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process , bool ), cleanup func ()) {
148+ func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process ), cleanup func ()) {
147149 exits := make (map [int ][]runcC.Exit )
148-
149- s .lifecycleMu .Lock ()
150- defer s .lifecycleMu .Unlock ()
151150 s .exitSubscribers [& exits ] = struct {}{}
152151
153152 if c != nil {
@@ -167,30 +166,65 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
167166 }
168167 }
169168
170- handleStarted = func (c * runc.Container , p process.Process , muLocked bool ) {
169+ handleStarted = func (c * runc.Container , p process.Process ) {
171170 var pid int
172171 if p != nil {
173172 pid = p .Pid ()
174173 }
175174
175+ _ , init := p .(* process.Init )
176176 s .lifecycleMu .Lock ()
177+
178+ var initExits []runcC.Exit
179+ var initCps []containerProcess
180+ if ! init {
181+ s .pendingExecs [c ]--
182+
183+ initPid := c .Pid ()
184+ iExits , initExited := exits [initPid ]
185+ if initExited && s .pendingExecs [c ] == 0 {
186+ // c's init process has exited before handleStarted was called and
187+ // this is the last pending exec process start - we need to process
188+ // the exit for the init process after processing this exec, so:
189+ // - delete c from the s.pendingExecs map
190+ // - keep the exits for the init pid to process later (after we process
191+ // this exec's exits)
192+ // - get the necessary containerProcesses for the init process (that we
193+ // need to process the exits), and remove them from s.running (which we skipped
194+ // doing in processExits).
195+ delete (s .pendingExecs , c )
196+ initExits = iExits
197+ var skipped []containerProcess
198+ for _ , initPidCp := range s .running [initPid ] {
199+ if initPidCp .Container == c {
200+ initCps = append (initCps , initPidCp )
201+ } else {
202+ skipped = append (skipped , initPidCp )
203+ }
204+ }
205+ if len (skipped ) == 0 {
206+ delete (s .running , initPid )
207+ } else {
208+ s .running [initPid ] = skipped
209+ }
210+ }
211+ }
212+
177213 ees , exited := exits [pid ]
178214 delete (s .exitSubscribers , & exits )
179215 exits = nil
180- if pid == 0 { // no-op
181- s .lifecycleMu .Unlock ()
182- } else if exited {
216+ if pid == 0 || exited {
183217 s .lifecycleMu .Unlock ()
184218 for _ , ee := range ees {
185- if muLocked {
186- s .handleProcessExit (ee , c , p )
187- } else {
188- s .mu .Lock ()
189- s .handleProcessExit (ee , c , p )
190- s .mu .Unlock ()
219+ s .handleProcessExit (ee , c , p )
220+ }
221+ for _ , eee := range initExits {
222+ for _ , cp := range initCps {
223+ s .handleProcessExit (eee , cp .Container , cp .Process )
191224 }
192225 }
193226 } else {
227+ // Process start was successful, add to `s.running`.
194228 s .running [pid ] = append (s .running [pid ], containerProcess {
195229 Container : c ,
196230 Process : p ,
@@ -215,7 +249,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
215249 s .mu .Lock ()
216250 defer s .mu .Unlock ()
217251
252+ s .lifecycleMu .Lock ()
218253 handleStarted , cleanup := s .preStart (nil )
254+ s .lifecycleMu .Unlock ()
219255 defer cleanup ()
220256
221257 container , err := runc .NewContainer (ctx , s .platform , r )
@@ -243,7 +279,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
243279 // could happen would also cause the container.Pid() call above to
244280 // nil-deference panic.
245281 proc , _ := container .Process ("" )
246- handleStarted (container , proc , true )
282+ handleStarted (container , proc )
247283
248284 return & taskAPI.CreateTaskResponse {
249285 Pid : uint32 (container .Pid ()),
@@ -263,14 +299,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
263299 }
264300
265301 var cinit * runc.Container
302+ s .lifecycleMu .Lock ()
266303 if r .ExecID == "" {
267304 cinit = container
305+ } else {
306+ s .pendingExecs [container ]++
268307 }
269308 handleStarted , cleanup := s .preStart (cinit )
309+ s .lifecycleMu .Unlock ()
270310 defer cleanup ()
311+
271312 p , err := container .Start (ctx , r )
272313 if err != nil {
273- handleStarted (container , p , false )
314+ handleStarted (container , p )
274315 return nil , errdefs .ToGRPC (err )
275316 }
276317
@@ -310,7 +351,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
310351 Pid : uint32 (p .Pid ()),
311352 })
312353 }
313- handleStarted (container , p , false )
354+ handleStarted (container , p )
314355 return & taskAPI.StartResponse {
315356 Pid : uint32 (p .Pid ()),
316357 }, nil
@@ -634,14 +675,27 @@ func (s *service) processExits() {
634675 // Handle the exit for a created/started process. If there's more than
635676 // one, assume they've all exited. One of them will be the correct
636677 // process.
637- cps := s .running [e .Pid ]
638- delete (s .running , e .Pid )
678+ var cps , skipped []containerProcess
679+ for _ , cp := range s .running [e .Pid ] {
680+ if s .pendingExecs [cp .Container ] != 0 {
681+ // This exit relates to a container for which we have pending execs. In
682+ // order to ensure order between execs and the init process for a given
683+ // container, skip processing this exit here and let the `handleStarted`
684+ // closure for the pending exec publish it.
685+ skipped = append (skipped , cp )
686+ } else {
687+ cps = append (cps , cp )
688+ }
689+ }
690+ if len (skipped ) > 0 {
691+ s .running [e .Pid ] = skipped
692+ } else {
693+ delete (s .running , e .Pid )
694+ }
639695 s .lifecycleMu .Unlock ()
640696
641697 for _ , cp := range cps {
642- s .mu .Lock ()
643698 s .handleProcessExit (e , cp .Container , cp .Process )
644- s .mu .Unlock ()
645699 }
646700 }
647701}
0 commit comments