Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/v2/runc/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
return p, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}
Expand Down
200 changes: 151 additions & 49 deletions runtime/v2/runc/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand All @@ -100,8 +102,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S

// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
mu sync.Mutex

context context.Context
events chan interface{}
Expand All @@ -111,14 +112,103 @@ type service struct {

containers map[string]*runc.Container

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
exitSubscribers map[*map[int][]runcC.Exit]struct{}

shutdown shutdown.Service
}

type containerProcess struct {
Container *runc.Container
Process process.Process
}

// preStart prepares for starting a container process and handling its exit.
// The container being started should be passed in as c when starting the
// container init process for an already-created container. c should be nil when
// creating a container or when starting an exec.
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. handleStarted should be called after the
// event announcing the start of the process has been published.
//
// The returned cleanup closure releases resources used to handle early exits.
// It must be called before the caller of preStart returns, otherwise severe
// memory leaks will occur.
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
exits := make(map[int][]runcC.Exit)

s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
s.exitSubscribers[&exits] = struct{}{}

if c != nil {
// Remove container init process from s.running so it will once again be
// treated as an early exit if it exits before handleStarted is called.
pid := c.Pid()
var newRunning []containerProcess
for _, cp := range s.running[pid] {
if cp.Container != c {
newRunning = append(newRunning, cp)
}
}
if len(newRunning) > 0 {
s.running[pid] = newRunning
} else {
delete(s.running, pid)
}
}

handleStarted = func(c *runc.Container, p process.Process) {
var pid int
if p != nil {
pid = p.Pid()
}

s.lifecycleMu.Lock()
ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Comment on lines +168 to +176
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding pid == 0 case, do we need to have the entry in exits map?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Why would it? The exits map only has entries for PIDs which exited while subscribed, so there might not be an entry for a pid != 0 either. The only thing special about the pid == 0 case is that no exit event will ever be seen for that PID so it would be a memory leak if it wasn't special-cased and s.running[0] was appended to.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how about returning early?

if p == nil {
  return
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My implementation defends against a memory leak if a (buggy?) non-nil process implementation evaluates p.Pid() == 0. I can simplify it if you think I'm being too paranoid

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'm fine with what you have right now :)

if pid == 0 { // no-op
s.lifecycleMu.Unlock()
} else if exited {
s.lifecycleMu.Unlock()
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
} else {
s.running[pid] = append(s.running[pid], containerProcess{
Container: c,
Process: p,
})
s.lifecycleMu.Unlock()
}
}

cleanup = func() {
if exits != nil {
Comment thread
kzys marked this conversation as resolved.
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.exitSubscribers, &exits)
}
}

return handleStarted, cleanup
}

// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()

handleStarted, cleanup := s.preStart(nil)
defer cleanup()

container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
Expand All @@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
Pid: uint32(container.Pid()),
})

// The following line cannot return an error as the only state in which that
// could happen would also cause the container.Pid() call above to
// nil-deference panic.
proc, _ := container.Process("")
handleStarted(container, proc)

return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
Expand All @@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
return nil, err
}

// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
var cinit *runc.Container
if r.ExecID == "" {
cinit = container
}
handleStarted, cleanup := s.preStart(cinit)
defer cleanup()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
handleStarted(container, p)
return nil, errdefs.ToGRPC(err)
}

Expand Down Expand Up @@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
handleStarted(container, p)
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
Expand Down Expand Up @@ -509,56 +609,58 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.

func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
// While unlikely, it is not impossible for a container process to exit
// and have its PID be recycled for a new container process before we
// have a chance to process the first exit. As we have no way to tell
// for sure which of the processes the exit event corresponds to (until
// pidfd support is implemented) there is no way for us to handle the
// exit correctly in that case.

s.lifecycleMu.Lock()
// Inform any concurrent s.Start() calls so they can handle the exit
// if the PID belongs to them.
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
s.handleProcessExit(e, cp.Container, cp.Process)
}
}
}

func (s *service) send(evt interface{}) {
s.events <- evt
}

func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}

func (s *service) checkProcesses(e runcC.Exit) {
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
s.mu.Lock()
defer s.mu.Unlock()

for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}

for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
}

if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.L.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}

p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
return
}
return
}

p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}

func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
Expand Down