Skip to content

Commit 9f705f4

Browse files
authored
Merge pull request #2958 from crosbymichael/runcv2-events
Move task events to runc v2 shim
2 parents 9aac018 + 85aa8ad commit 9f705f4

1 file changed

Lines changed: 68 additions & 5 deletions

File tree

runtime/v2/runc/service.go

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
9494

9595
// service is the shim implementation of a remote shim over GRPC
9696
type service struct {
97-
mu sync.Mutex
97+
mu sync.Mutex
98+
eventSendMu sync.Mutex
9899

99100
context context.Context
100101
task rproc.Process
@@ -311,6 +312,21 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
311312
s.cg = cg
312313
}
313314
s.task = process
315+
316+
s.send(&eventstypes.TaskCreate{
317+
ContainerID: r.ID,
318+
Bundle: r.Bundle,
319+
Rootfs: r.Rootfs,
320+
IO: &eventstypes.TaskIO{
321+
Stdin: r.Stdin,
322+
Stdout: r.Stdout,
323+
Stderr: r.Stderr,
324+
Terminal: r.Terminal,
325+
},
326+
Checkpoint: r.Checkpoint,
327+
Pid: uint32(pid),
328+
})
329+
314330
return &taskAPI.CreateTaskResponse{
315331
Pid: uint32(pid),
316332
}, nil
@@ -323,9 +339,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
323339
if err != nil {
324340
return nil, err
325341
}
342+
343+
// hold the send lock so that the start events are sent before any exit events in the error case
344+
s.eventSendMu.Lock()
326345
if err := p.Start(ctx); err != nil {
346+
s.eventSendMu.Unlock()
327347
return nil, err
328348
}
349+
329350
// case for restore
330351
if s.getCgroup() == nil && p.Pid() > 0 {
331352
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
@@ -334,6 +355,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
334355
}
335356
s.setCgroup(cg)
336357
}
358+
if r.ExecID != "" {
359+
s.send(&eventstypes.TaskExecStarted{
360+
ContainerID: s.id,
361+
ExecID: r.ExecID,
362+
Pid: uint32(p.Pid()),
363+
})
364+
} else {
365+
s.send(&eventstypes.TaskStart{
366+
ContainerID: s.id,
367+
Pid: uint32(p.Pid()),
368+
})
369+
}
370+
s.eventSendMu.Unlock()
337371
return &taskAPI.StartResponse{
338372
Pid: uint32(p.Pid()),
339373
}, nil
@@ -357,8 +391,16 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
357391
delete(s.processes, r.ExecID)
358392
s.mu.Unlock()
359393
}
360-
if isTask && s.platform != nil {
361-
s.platform.Close()
394+
if isTask {
395+
if s.platform != nil {
396+
s.platform.Close()
397+
}
398+
s.send(&eventstypes.TaskDelete{
399+
ContainerID: s.id,
400+
Pid: uint32(p.Pid()),
401+
ExitStatus: uint32(p.ExitStatus()),
402+
ExitedAt: p.ExitedAt(),
403+
})
362404
}
363405
return &taskAPI.DeleteResponse{
364406
ExitStatus: uint32(p.ExitStatus()),
@@ -393,6 +435,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
393435
s.mu.Lock()
394436
s.processes[r.ExecID] = process
395437
s.mu.Unlock()
438+
439+
s.send(&eventstypes.TaskExecAdded{
440+
ContainerID: s.id,
441+
ExecID: process.ID(),
442+
})
396443
return empty, nil
397444
}
398445

@@ -461,6 +508,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
461508
if err := p.(*proc.Init).Pause(ctx); err != nil {
462509
return nil, err
463510
}
511+
s.send(&eventstypes.TaskPaused{
512+
p.ID(),
513+
})
464514
return empty, nil
465515
}
466516

@@ -475,6 +525,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
475525
if err := p.(*proc.Init).Resume(ctx); err != nil {
476526
return nil, err
477527
}
528+
s.send(&eventstypes.TaskResumed{
529+
p.ID(),
530+
})
478531
return empty, nil
479532
}
480533

@@ -642,6 +695,16 @@ func (s *service) processExits() {
642695
}
643696
}
644697

698+
func (s *service) send(evt interface{}) {
699+
s.events <- evt
700+
}
701+
702+
func (s *service) sendL(evt interface{}) {
703+
s.eventSendMu.Lock()
704+
s.events <- evt
705+
s.eventSendMu.Unlock()
706+
}
707+
645708
func (s *service) checkProcesses(e runcC.Exit) {
646709
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
647710
if err != nil {
@@ -660,13 +723,13 @@ func (s *service) checkProcesses(e runcC.Exit) {
660723
}
661724
}
662725
p.SetExited(e.Status)
663-
s.events <- &eventstypes.TaskExit{
726+
s.sendL(&eventstypes.TaskExit{
664727
ContainerID: s.id,
665728
ID: p.ID(),
666729
Pid: uint32(e.Pid),
667730
ExitStatus: uint32(e.Status),
668731
ExitedAt: p.ExitedAt(),
669-
}
732+
})
670733
return
671734
}
672735
}

0 commit comments

Comments
 (0)