@@ -72,12 +72,14 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7272 }
7373 go ep .Run (ctx )
7474 s := & service {
75- context : ctx ,
76- events : make (chan interface {}, 128 ),
77- ec : reaper .Default .Subscribe (),
78- ep : ep ,
79- shutdown : sd ,
80- containers : make (map [string ]* runc.Container ),
75+ context : ctx ,
76+ events : make (chan interface {}, 128 ),
77+ ec : reaper .Default .Subscribe (),
78+ ep : ep ,
79+ shutdown : sd ,
80+ containers : make (map [string ]* runc.Container ),
81+ running : make (map [int ][]containerProcess ),
82+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8183 }
8284 go s .processExits ()
8385 runcC .Monitor = reaper .Default
@@ -100,8 +102,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
100102
101103// service is the shim implementation of a remote shim over GRPC
102104type service struct {
103- mu sync.Mutex
104- eventSendMu sync.Mutex
105+ mu sync.Mutex
105106
106107 context context.Context
107108 events chan interface {}
@@ -111,14 +112,103 @@ type service struct {
111112
112113 containers map [string ]* runc.Container
113114
115+ lifecycleMu sync.Mutex
116+ running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
117+ // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
118+ // dereferencing the subscription pointers must only be done while holding
119+ // lifecycleMu.
120+ exitSubscribers map [* map [int ][]runcC.Exit ]struct {}
121+
114122 shutdown shutdown.Service
115123}
116124
125+ type containerProcess struct {
126+ Container * runc.Container
127+ Process process.Process
128+ }
129+
130+ // preStart prepares for starting a container process and handling its exit.
131+ // The container being started should be passed in as c when starting the
132+ // container init process for an already-created container. c should be nil when
133+ // creating a container or when starting an exec.
134+ //
135+ // The returned handleStarted closure records that the process has started so
136+ // that its exit can be handled efficiently. If the process has already exited,
137+ // it handles the exit immediately. handleStarted should be called after the
138+ // event announcing the start of the process has been published.
139+ //
140+ // The returned cleanup closure releases resources used to handle early exits.
141+ // It must be called before the caller of preStart returns, otherwise severe
142+ // memory leaks will occur.
143+ func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process ), cleanup func ()) {
144+ exits := make (map [int ][]runcC.Exit )
145+
146+ s .lifecycleMu .Lock ()
147+ defer s .lifecycleMu .Unlock ()
148+ s .exitSubscribers [& exits ] = struct {}{}
149+
150+ if c != nil {
151+ // Remove container init process from s.running so it will once again be
152+ // treated as an early exit if it exits before handleStarted is called.
153+ pid := c .Pid ()
154+ var newRunning []containerProcess
155+ for _ , cp := range s .running [pid ] {
156+ if cp .Container != c {
157+ newRunning = append (newRunning , cp )
158+ }
159+ }
160+ if len (newRunning ) > 0 {
161+ s .running [pid ] = newRunning
162+ } else {
163+ delete (s .running , pid )
164+ }
165+ }
166+
167+ handleStarted = func (c * runc.Container , p process.Process ) {
168+ var pid int
169+ if p != nil {
170+ pid = p .Pid ()
171+ }
172+
173+ s .lifecycleMu .Lock ()
174+ ees , exited := exits [pid ]
175+ delete (s .exitSubscribers , & exits )
176+ exits = nil
177+ if pid == 0 { // no-op
178+ s .lifecycleMu .Unlock ()
179+ } else if exited {
180+ s .lifecycleMu .Unlock ()
181+ for _ , ee := range ees {
182+ s .handleProcessExit (ee , c , p )
183+ }
184+ } else {
185+ s .running [pid ] = append (s .running [pid ], containerProcess {
186+ Container : c ,
187+ Process : p ,
188+ })
189+ s .lifecycleMu .Unlock ()
190+ }
191+ }
192+
193+ cleanup = func () {
194+ if exits != nil {
195+ s .lifecycleMu .Lock ()
196+ defer s .lifecycleMu .Unlock ()
197+ delete (s .exitSubscribers , & exits )
198+ }
199+ }
200+
201+ return handleStarted , cleanup
202+ }
203+
117204// Create a new initial process and container with the underlying OCI runtime
118205func (s * service ) Create (ctx context.Context , r * taskAPI.CreateTaskRequest ) (_ * taskAPI.CreateTaskResponse , err error ) {
119206 s .mu .Lock ()
120207 defer s .mu .Unlock ()
121208
209+ handleStarted , cleanup := s .preStart (nil )
210+ defer cleanup ()
211+
122212 container , err := runc .NewContainer (ctx , s .platform , r )
123213 if err != nil {
124214 return nil , err
@@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
140230 Pid : uint32 (container .Pid ()),
141231 })
142232
233+ // The following line cannot return an error as the only state in which that
234+ // could happen would also cause the container.Pid() call above to
235+ // nil-deference panic.
236+ proc , _ := container .Process ("" )
237+ handleStarted (container , proc )
238+
143239 return & taskAPI.CreateTaskResponse {
144240 Pid : uint32 (container .Pid ()),
145241 }, nil
@@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
157253 return nil , err
158254 }
159255
160- // hold the send lock so that the start events are sent before any exit events in the error case
161- s .eventSendMu .Lock ()
256+ var cinit * runc.Container
257+ if r .ExecID == "" {
258+ cinit = container
259+ }
260+ handleStarted , cleanup := s .preStart (cinit )
261+ defer cleanup ()
162262 p , err := container .Start (ctx , r )
163263 if err != nil {
164- s . eventSendMu . Unlock ( )
264+ handleStarted ( container , p )
165265 return nil , errdefs .ToGRPC (err )
166266 }
167267
@@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
201301 Pid : uint32 (p .Pid ()),
202302 })
203303 }
204- s . eventSendMu . Unlock ( )
304+ handleStarted ( container , p )
205305 return & taskAPI.StartResponse {
206306 Pid : uint32 (p .Pid ()),
207307 }, nil
@@ -509,56 +609,58 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
509609
510610func (s * service ) processExits () {
511611 for e := range s .ec {
512- s .checkProcesses (e )
612+ // While unlikely, it is not impossible for a container process to exit
613+ // and have its PID be recycled for a new container process before we
614+ // have a chance to process the first exit. As we have no way to tell
615+ // for sure which of the processes the exit event corresponds to (until
616+ // pidfd support is implemented) there is no way for us to handle the
617+ // exit correctly in that case.
618+
619+ s .lifecycleMu .Lock ()
620+ // Inform any concurrent s.Start() calls so they can handle the exit
621+ // if the PID belongs to them.
622+ for subscriber := range s .exitSubscribers {
623+ (* subscriber )[e .Pid ] = append ((* subscriber )[e .Pid ], e )
624+ }
625+ // Handle the exit for a created/started process. If there's more than
626+ // one, assume they've all exited. One of them will be the correct
627+ // process.
628+ cps := s .running [e .Pid ]
629+ delete (s .running , e .Pid )
630+ s .lifecycleMu .Unlock ()
631+
632+ for _ , cp := range cps {
633+ s .handleProcessExit (e , cp .Container , cp .Process )
634+ }
513635 }
514636}
515637
516638func (s * service ) send (evt interface {}) {
517639 s .events <- evt
518640}
519641
520- func (s * service ) sendL (evt interface {}) {
521- s .eventSendMu .Lock ()
522- s .events <- evt
523- s .eventSendMu .Unlock ()
524- }
525-
526- func (s * service ) checkProcesses (e runcC.Exit ) {
642+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
527643 s .mu .Lock ()
528644 defer s .mu .Unlock ()
529645
530- for _ , container := range s .containers {
531- if ! container .HasPid (e .Pid ) {
532- continue
533- }
534-
535- for _ , p := range container .All () {
536- if p .Pid () != e .Pid {
537- continue
538- }
539-
540- if ip , ok := p .(* process.Init ); ok {
541- // Ensure all children are killed
542- if runc .ShouldKillAllOnExit (s .context , container .Bundle ) {
543- if err := ip .KillAll (s .context ); err != nil {
544- logrus .WithError (err ).WithField ("id" , ip .ID ()).
545- Error ("failed to kill init's children" )
546- }
547- }
646+ if ip , ok := p .(* process.Init ); ok {
647+ // Ensure all children are killed
648+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
649+ if err := ip .KillAll (s .context ); err != nil {
650+ logrus .WithError (err ).WithField ("id" , ip .ID ()).
651+ Error ("failed to kill init's children" )
548652 }
549-
550- p .SetExited (e .Status )
551- s .sendL (& eventstypes.TaskExit {
552- ContainerID : container .ID ,
553- ID : p .ID (),
554- Pid : uint32 (e .Pid ),
555- ExitStatus : uint32 (e .Status ),
556- ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
557- })
558- return
559653 }
560- return
561654 }
655+
656+ p .SetExited (e .Status )
657+ s .send (& eventstypes.TaskExit {
658+ ContainerID : c .ID ,
659+ ID : p .ID (),
660+ Pid : uint32 (e .Pid ),
661+ ExitStatus : uint32 (e .Status ),
662+ ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
663+ })
562664}
563665
564666func (s * service ) getContainerPids (ctx context.Context , id string ) ([]uint32 , error ) {
0 commit comments