Skip to content

Commit 18c6503

Browse files
corherelaurazard
andcommitted
runtime/v2/runc: handle early exits w/o big locks
eventSendMu is causing severe lock contention when multiple processes start and exit concurrently. Replace it with a different scheme for maintaining causality w.r.t. start and exit events for a process which does not rely on big locks for synchronization. Keep track of all processes for which a Task(Exec)Start event has been published and have not yet exited in a map, keyed by their PID. Processing exits then is as simple as looking up which process corresponds to the PID. If there are no started processes known with that PID, the PID must either belong to a process which was started by s.Start() and before the s.Start() call has added the process to the map of running processes, or a reparented process which we don't care about. Handle the former case by having each s.Start() call subscribe to exit events before starting the process. It checks if the PID has exited in the time between it starting the process and publishing the TaskStart event, handling the exit if it has. Exit events for reparented processes received when no s.Start() calls are in flight are immediately discarded, and events received during an s.Start() call are discarded when the s.Start() call returns. Co-authored-by: Laura Brehm <[email protected]> Signed-off-by: Cory Snider <[email protected]> (cherry picked from commit 5cd6210) Signed-off-by: Cory Snider <[email protected]>
1 parent a766dc5 commit 18c6503

File tree

2 files changed

+152
-50
lines changed

2 files changed

+152
-50
lines changed

runtime/v2/runc/container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr
362362
return nil, err
363363
}
364364
if err := p.Start(ctx); err != nil {
365-
return nil, err
365+
return p, err
366366
}
367367
if c.Cgroup() == nil && p.Pid() > 0 {
368368
var cg interface{}

runtime/v2/runc/task/service.go

Lines changed: 151 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
7272
}
7373
go ep.Run(ctx)
7474
s := &service{
75-
context: ctx,
76-
events: make(chan interface{}, 128),
77-
ec: reaper.Default.Subscribe(),
78-
ep: ep,
79-
shutdown: sd,
80-
containers: make(map[string]*runc.Container),
75+
context: ctx,
76+
events: make(chan interface{}, 128),
77+
ec: reaper.Default.Subscribe(),
78+
ep: ep,
79+
shutdown: sd,
80+
containers: make(map[string]*runc.Container),
81+
running: make(map[int][]containerProcess),
82+
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
8183
}
8284
go s.processExits()
8385
runcC.Monitor = reaper.Default
@@ -100,8 +102,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
100102

101103
// service is the shim implementation of a remote shim over GRPC
102104
type service struct {
103-
mu sync.Mutex
104-
eventSendMu sync.Mutex
105+
mu sync.Mutex
105106

106107
context context.Context
107108
events chan interface{}
@@ -111,14 +112,103 @@ type service struct {
111112

112113
containers map[string]*runc.Container
113114

115+
lifecycleMu sync.Mutex
116+
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
117+
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
118+
// dereferencing the subscription pointers must only be done while holding
119+
// lifecycleMu.
120+
exitSubscribers map[*map[int][]runcC.Exit]struct{}
121+
114122
shutdown shutdown.Service
115123
}
116124

125+
type containerProcess struct {
126+
Container *runc.Container
127+
Process process.Process
128+
}
129+
130+
// preStart prepares for starting a container process and handling its exit.
131+
// The container being started should be passed in as c when starting the
132+
// container init process for an already-created container. c should be nil when
133+
// creating a container or when starting an exec.
134+
//
135+
// The returned handleStarted closure records that the process has started so
136+
// that its exit can be handled efficiently. If the process has already exited,
137+
// it handles the exit immediately. handleStarted should be called after the
138+
// event announcing the start of the process has been published.
139+
//
140+
// The returned cleanup closure releases resources used to handle early exits.
141+
// It must be called before the caller of preStart returns, otherwise severe
142+
// memory leaks will occur.
143+
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
144+
exits := make(map[int][]runcC.Exit)
145+
146+
s.lifecycleMu.Lock()
147+
defer s.lifecycleMu.Unlock()
148+
s.exitSubscribers[&exits] = struct{}{}
149+
150+
if c != nil {
151+
// Remove container init process from s.running so it will once again be
152+
// treated as an early exit if it exits before handleStarted is called.
153+
pid := c.Pid()
154+
var newRunning []containerProcess
155+
for _, cp := range s.running[pid] {
156+
if cp.Container != c {
157+
newRunning = append(newRunning, cp)
158+
}
159+
}
160+
if len(newRunning) > 0 {
161+
s.running[pid] = newRunning
162+
} else {
163+
delete(s.running, pid)
164+
}
165+
}
166+
167+
handleStarted = func(c *runc.Container, p process.Process) {
168+
var pid int
169+
if p != nil {
170+
pid = p.Pid()
171+
}
172+
173+
s.lifecycleMu.Lock()
174+
ees, exited := exits[pid]
175+
delete(s.exitSubscribers, &exits)
176+
exits = nil
177+
if pid == 0 { // no-op
178+
s.lifecycleMu.Unlock()
179+
} else if exited {
180+
s.lifecycleMu.Unlock()
181+
for _, ee := range ees {
182+
s.handleProcessExit(ee, c, p)
183+
}
184+
} else {
185+
s.running[pid] = append(s.running[pid], containerProcess{
186+
Container: c,
187+
Process: p,
188+
})
189+
s.lifecycleMu.Unlock()
190+
}
191+
}
192+
193+
cleanup = func() {
194+
if exits != nil {
195+
s.lifecycleMu.Lock()
196+
defer s.lifecycleMu.Unlock()
197+
delete(s.exitSubscribers, &exits)
198+
}
199+
}
200+
201+
return handleStarted, cleanup
202+
}
203+
117204
// Create a new initial process and container with the underlying OCI runtime
118205
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
119206
s.mu.Lock()
120207
defer s.mu.Unlock()
121208

209+
handleStarted, cleanup := s.preStart(nil)
210+
defer cleanup()
211+
122212
container, err := runc.NewContainer(ctx, s.platform, r)
123213
if err != nil {
124214
return nil, err
@@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
140230
Pid: uint32(container.Pid()),
141231
})
142232

233+
// The following line cannot return an error as the only state in which that
234+
// could happen would also cause the container.Pid() call above to
235+
// nil-deference panic.
236+
proc, _ := container.Process("")
237+
handleStarted(container, proc)
238+
143239
return &taskAPI.CreateTaskResponse{
144240
Pid: uint32(container.Pid()),
145241
}, nil
@@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
157253
return nil, err
158254
}
159255

160-
// hold the send lock so that the start events are sent before any exit events in the error case
161-
s.eventSendMu.Lock()
256+
var cinit *runc.Container
257+
if r.ExecID == "" {
258+
cinit = container
259+
}
260+
handleStarted, cleanup := s.preStart(cinit)
261+
defer cleanup()
162262
p, err := container.Start(ctx, r)
163263
if err != nil {
164-
s.eventSendMu.Unlock()
264+
handleStarted(container, p)
165265
return nil, errdefs.ToGRPC(err)
166266
}
167267

@@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
201301
Pid: uint32(p.Pid()),
202302
})
203303
}
204-
s.eventSendMu.Unlock()
304+
handleStarted(container, p)
205305
return &taskAPI.StartResponse{
206306
Pid: uint32(p.Pid()),
207307
}, nil
@@ -509,56 +609,58 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
509609

510610
func (s *service) processExits() {
511611
for e := range s.ec {
512-
s.checkProcesses(e)
612+
// While unlikely, it is not impossible for a container process to exit
613+
// and have its PID be recycled for a new container process before we
614+
// have a chance to process the first exit. As we have no way to tell
615+
// for sure which of the processes the exit event corresponds to (until
616+
// pidfd support is implemented) there is no way for us to handle the
617+
// exit correctly in that case.
618+
619+
s.lifecycleMu.Lock()
620+
// Inform any concurrent s.Start() calls so they can handle the exit
621+
// if the PID belongs to them.
622+
for subscriber := range s.exitSubscribers {
623+
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
624+
}
625+
// Handle the exit for a created/started process. If there's more than
626+
// one, assume they've all exited. One of them will be the correct
627+
// process.
628+
cps := s.running[e.Pid]
629+
delete(s.running, e.Pid)
630+
s.lifecycleMu.Unlock()
631+
632+
for _, cp := range cps {
633+
s.handleProcessExit(e, cp.Container, cp.Process)
634+
}
513635
}
514636
}
515637

516638
func (s *service) send(evt interface{}) {
517639
s.events <- evt
518640
}
519641

520-
func (s *service) sendL(evt interface{}) {
521-
s.eventSendMu.Lock()
522-
s.events <- evt
523-
s.eventSendMu.Unlock()
524-
}
525-
526-
func (s *service) checkProcesses(e runcC.Exit) {
642+
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
527643
s.mu.Lock()
528644
defer s.mu.Unlock()
529645

530-
for _, container := range s.containers {
531-
if !container.HasPid(e.Pid) {
532-
continue
533-
}
534-
535-
for _, p := range container.All() {
536-
if p.Pid() != e.Pid {
537-
continue
538-
}
539-
540-
if ip, ok := p.(*process.Init); ok {
541-
// Ensure all children are killed
542-
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
543-
if err := ip.KillAll(s.context); err != nil {
544-
logrus.WithError(err).WithField("id", ip.ID()).
545-
Error("failed to kill init's children")
546-
}
547-
}
646+
if ip, ok := p.(*process.Init); ok {
647+
// Ensure all children are killed
648+
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
649+
if err := ip.KillAll(s.context); err != nil {
650+
logrus.WithError(err).WithField("id", ip.ID()).
651+
Error("failed to kill init's children")
548652
}
549-
550-
p.SetExited(e.Status)
551-
s.sendL(&eventstypes.TaskExit{
552-
ContainerID: container.ID,
553-
ID: p.ID(),
554-
Pid: uint32(e.Pid),
555-
ExitStatus: uint32(e.Status),
556-
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
557-
})
558-
return
559653
}
560-
return
561654
}
655+
656+
p.SetExited(e.Status)
657+
s.send(&eventstypes.TaskExit{
658+
ContainerID: c.ID,
659+
ID: p.ID(),
660+
Pid: uint32(e.Pid),
661+
ExitStatus: uint32(e.Status),
662+
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
663+
})
562664
}
563665

564666
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {

0 commit comments

Comments
 (0)