Skip to content

Commit 421a4b5

Browse files
committed
runc-shim: handle pending execs as running
This commit rewrites and simplifies a lot of this logic to reduce it's complexity, and also handle the case where the container doesn't have it's own pid-namespace, which means that we're not guaranteed to receive the init exit last. This is achieved by replacing `s.pendingExecs` with `s.runningExecs`, for which both (previously) pending and de facto running execs are considered. The new exit handling logic can be summed up by: - when we receive an init exit, stash it it in `s.containerInitExit`, and if a container's init process has exited, refuse new execs. - (if the container does not have it's own pidns) kill all running processes (if the container has a private pid-namespace, then all processes will be dead already). - wait for the container's running exec count (which includes execs which have been started but might still early exit) to get to 0. - publish the stashed away init exit. Signed-off-by: Laura Brehm <[email protected]>
1 parent e735791 commit 421a4b5

1 file changed

Lines changed: 104 additions & 88 deletions

File tree

cmd/containerd-shim-runc-v2/task/service.go

Lines changed: 104 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7474
}
7575
go ep.Run(ctx)
7676
s := &service{
77-
context: ctx,
78-
events: make(chan interface{}, 128),
79-
ec: reaper.Default.Subscribe(),
80-
ep: ep,
81-
shutdown: sd,
82-
containers: make(map[string]*runc.Container),
83-
running: make(map[int][]containerProcess),
84-
pendingExecs: make(map[*runc.Container]int),
85-
execable: make(map[*runc.Container]bool),
86-
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
77+
context: ctx,
78+
events: make(chan interface{}, 128),
79+
ec: reaper.Default.Subscribe(),
80+
ep: ep,
81+
shutdown: sd,
82+
containers: make(map[string]*runc.Container),
83+
running: make(map[int][]containerProcess),
84+
runningExecs: make(map[*runc.Container]int),
85+
execCountSubscribers: make(map[*runc.Container]chan<- int),
86+
containerInitExit: make(map[*runc.Container]runcC.Exit),
87+
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
8788
}
8889
go s.processExits()
8990
runcC.Monitor = reaper.Default
@@ -118,13 +119,19 @@ type service struct {
118119

119120
lifecycleMu sync.Mutex
120121
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
121-
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
122-
// container -> execs can be started, guarded by lifecycleMu.
123-
// Execs can be started if the container's init process (read: pid, not [process.Init])
124-
// has been started and not yet reaped by the shim.
122+
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu
123+
// container -> subscription to exec exits/changes to s.runningExecs[container],
124+
// guarded by lifecycleMu
125+
execCountSubscribers map[*runc.Container]chan<- int
126+
// container -> init exits, guarded by lifecycleMu
127+
// Used to stash container init process exits, so that we can hold them
128+
// until after we've made sure to publish all the container's exec exits.
129+
// Also used to prevent starting new execs from being started if the
130+
// container's init process (read: pid, not [process.Init]) has already been
131+
// reaped by the shim.
125132
// Note that this flag gets updated before the container's [process.Init.Status]
126133
// is transitioned to "stopped".
127-
execable map[*runc.Container]bool
134+
containerInitExit map[*runc.Container]runcC.Exit
128135
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
129136
// dereferencing the subscription pointers must only be done while holding
130137
// lifecycleMu.
@@ -145,8 +152,7 @@ type containerProcess struct {
145152
//
146153
// The returned handleStarted closure records that the process has started so
147154
// that its exit can be handled efficiently. If the process has already exited,
148-
// it handles the exit immediately. In addition, if the process is an exec and
149-
// its container's init process has already exited, that exit is also processed.
155+
// it handles the exit immediately.
150156
// handleStarted should be called after the event announcing the start of the
151157
// process has been published. Note that s.lifecycleMu must not be held when
152158
// calling handleStarted.
@@ -181,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
181187
pid = p.Pid()
182188
}
183189

184-
_, init := p.(*process.Init)
185190
s.lifecycleMu.Lock()
186191

187-
var initExits []runcC.Exit
188-
var initCps []containerProcess
189-
if !init {
190-
s.pendingExecs[c]--
191-
192-
initPid := c.Pid()
193-
iExits, initExited := exits[initPid]
194-
if initExited && s.pendingExecs[c] == 0 {
195-
// c's init process has exited before handleStarted was called and
196-
// this is the last pending exec process start - we need to process
197-
// the exit for the init process after processing this exec, so:
198-
// - delete c from the s.pendingExecs map
199-
// - keep the exits for the init pid to process later (after we process
200-
// this exec's exits)
201-
// - get the necessary containerProcesses for the init process (that we
202-
// need to process the exits), and remove them from s.running (which we skipped
203-
// doing in processExits).
204-
delete(s.pendingExecs, c)
205-
initExits = iExits
206-
var skipped []containerProcess
207-
for _, initPidCp := range s.running[initPid] {
208-
if initPidCp.Container == c {
209-
initCps = append(initCps, initPidCp)
210-
} else {
211-
skipped = append(skipped, initPidCp)
212-
}
213-
}
214-
if len(skipped) == 0 {
215-
delete(s.running, initPid)
216-
} else {
217-
s.running[initPid] = skipped
218-
}
219-
}
220-
}
221-
222192
ees, exited := exits[pid]
223193
delete(s.exitSubscribers, &exits)
224194
exits = nil
@@ -227,20 +197,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
227197
for _, ee := range ees {
228198
s.handleProcessExit(ee, c, p)
229199
}
230-
for _, eee := range initExits {
231-
for _, cp := range initCps {
232-
s.handleProcessExit(eee, cp.Container, cp.Process)
233-
}
234-
}
235200
} else {
236201
// Process start was successful, add to `s.running`.
237202
s.running[pid] = append(s.running[pid], containerProcess{
238203
Container: c,
239204
Process: p,
240205
})
241-
if init {
242-
s.execable[c] = true
243-
}
244206
s.lifecycleMu.Unlock()
245207
}
246208
}
@@ -315,18 +277,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
315277
if r.ExecID == "" {
316278
cinit = container
317279
} else {
318-
if !s.execable[container] {
280+
if _, initExited := s.containerInitExit[container]; initExited {
319281
s.lifecycleMu.Unlock()
320282
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
321283
}
322-
s.pendingExecs[container]++
284+
s.runningExecs[container]++
323285
}
324286
handleStarted, cleanup := s.preStart(cinit)
325287
s.lifecycleMu.Unlock()
326288
defer cleanup()
327289

328290
p, err := container.Start(ctx, r)
329291
if err != nil {
292+
// If we failed to even start the process, s.runningExecs
293+
// won't get decremented in s.handleProcessExit. We still need
294+
// to update it.
295+
if r.ExecID != "" {
296+
s.lifecycleMu.Lock()
297+
s.runningExecs[container]--
298+
if ch, ok := s.execCountSubscribers[container]; ok {
299+
ch <- s.runningExecs[container]
300+
}
301+
s.lifecycleMu.Unlock()
302+
}
330303
handleStarted(container, p)
331304
return nil, errdefs.ToGRPC(err)
332305
}
@@ -691,31 +664,23 @@ func (s *service) processExits() {
691664
// Handle the exit for a created/started process. If there's more than
692665
// one, assume they've all exited. One of them will be the correct
693666
// process.
694-
var cps, skipped []containerProcess
667+
var cps []containerProcess
695668
for _, cp := range s.running[e.Pid] {
696669
_, init := cp.Process.(*process.Init)
697670
if init {
698-
delete(s.execable, cp.Container)
699-
}
700-
if init && s.pendingExecs[cp.Container] != 0 {
701-
// This exit relates to a container for which we have pending execs. In
702-
// order to ensure order between execs and the init process for a given
703-
// container, skip processing this exit here and let the `handleStarted`
704-
// closure for the pending exec publish it.
705-
skipped = append(skipped, cp)
706-
} else {
707-
cps = append(cps, cp)
671+
s.containerInitExit[cp.Container] = e
708672
}
673+
cps = append(cps, cp)
709674
}
710-
if len(skipped) > 0 {
711-
s.running[e.Pid] = skipped
712-
} else {
713-
delete(s.running, e.Pid)
714-
}
675+
delete(s.running, e.Pid)
715676
s.lifecycleMu.Unlock()
716677

717678
for _, cp := range cps {
718-
s.handleProcessExit(e, cp.Container, cp.Process)
679+
if ip, ok := cp.Process.(*process.Init); ok {
680+
s.handleInitExit(e, cp.Container, ip)
681+
} else {
682+
s.handleProcessExit(e, cp.Container, cp.Process)
683+
}
719684
}
720685
}
721686
}
@@ -724,17 +689,60 @@ func (s *service) send(evt interface{}) {
724689
s.events <- evt
725690
}
726691

727-
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
728-
if ip, ok := p.(*process.Init); ok {
729-
// Ensure all children are killed
730-
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
731-
if err := ip.KillAll(s.context); err != nil {
732-
log.G(s.context).WithError(err).WithField("id", ip.ID()).
733-
Error("failed to kill init's children")
734-
}
692+
// handleInitExit processes container init process exits.
693+
// This is handled separately from non-init exits, because there
694+
// are some extra invariants we want to ensure in this case, namely:
695+
// - for a given container, the init process exit MUST be the last exit published
696+
// This is achieved by:
697+
// - killing all running container processes (if the container has a shared pid
698+
// namespace, otherwise all other processes have been reaped already).
699+
// - waiting for the container's running exec counter to reach 0.
700+
// - finally, publishing the init exit.
701+
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) {
702+
// kill all running container processes
703+
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
704+
if err := p.KillAll(s.context); err != nil {
705+
log.G(s.context).WithError(err).WithField("id", p.ID()).
706+
Error("failed to kill init's children")
735707
}
736708
}
737709

710+
s.lifecycleMu.Lock()
711+
numRunningExecs := s.runningExecs[c]
712+
if numRunningExecs == 0 {
713+
delete(s.runningExecs, c)
714+
s.lifecycleMu.Unlock()
715+
s.handleProcessExit(e, c, p)
716+
return
717+
}
718+
719+
events := make(chan int, numRunningExecs)
720+
s.execCountSubscribers[c] = events
721+
722+
s.lifecycleMu.Unlock()
723+
724+
go func() {
725+
defer func() {
726+
s.lifecycleMu.Lock()
727+
defer s.lifecycleMu.Unlock()
728+
delete(s.execCountSubscribers, c)
729+
delete(s.runningExecs, c)
730+
}()
731+
732+
// wait for running processes to exit
733+
for {
734+
if runningExecs := <-events; runningExecs == 0 {
735+
break
736+
}
737+
}
738+
739+
// all running processes have exited now, and no new
740+
// ones can start, so we can publish the init exit
741+
s.handleProcessExit(e, c, p)
742+
}()
743+
}
744+
745+
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
738746
p.SetExited(e.Status)
739747
s.send(&eventstypes.TaskExit{
740748
ContainerID: c.ID,
@@ -743,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
743751
ExitStatus: uint32(e.Status),
744752
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
745753
})
754+
if _, init := p.(*process.Init); !init {
755+
s.lifecycleMu.Lock()
756+
s.runningExecs[c]--
757+
if ch, ok := s.execCountSubscribers[c]; ok {
758+
ch <- s.runningExecs[c]
759+
}
760+
s.lifecycleMu.Unlock()
761+
}
746762
}
747763

748764
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {

0 commit comments

Comments
 (0)