Skip to content

Commit 0649e38

Browse files
Merge pull request #2589 from crosbymichael/shim-robo
Shim locking improvements and context cancel
2 parents 7cb847b + 2205e8d commit 0649e38

File tree

2 files changed

+55
-42
lines changed

2 files changed

+55
-42
lines changed

runtime/v2/runc/epoll.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,21 @@ func (e *epoller) Close() error {
6262
func (e *epoller) run(ctx context.Context) {
6363
var events [128]unix.EpollEvent
6464
for {
65-
n, err := unix.EpollWait(e.fd, events[:], -1)
66-
if err != nil {
67-
if err == unix.EINTR {
68-
continue
65+
select {
66+
case <-ctx.Done():
67+
e.Close()
68+
return
69+
default:
70+
n, err := unix.EpollWait(e.fd, events[:], -1)
71+
if err != nil {
72+
if err == unix.EINTR {
73+
continue
74+
}
75+
logrus.WithError(err).Error("cgroups: epoll wait")
76+
}
77+
for i := 0; i < n; i++ {
78+
e.process(ctx, uintptr(events[i].Fd))
6979
}
70-
logrus.WithError(err).Error("cgroups: epoll wait")
71-
}
72-
for i := 0; i < n; i++ {
73-
e.process(ctx, uintptr(events[i].Fd))
7480
}
7581
}
7682
}

runtime/v2/runc/service.go

+41-34
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
6868
if err != nil {
6969
return nil, err
7070
}
71+
ctx, cancel := context.WithCancel(ctx)
7172
go ep.run(ctx)
7273
s := &service{
7374
id: id,
@@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
7677
events: make(chan interface{}, 128),
7778
ec: shim.Default.Subscribe(),
7879
ep: ep,
80+
cancel: cancel,
7981
}
8082
go s.processExits()
8183
runcC.Monitor = shim.Default
8284
if err := s.initPlatform(); err != nil {
85+
cancel()
8386
return nil, errors.Wrap(err, "failed to initialized platform behavior")
8487
}
8588
go s.forward(publisher)
@@ -101,6 +104,7 @@ type service struct {
101104
id string
102105
bundle string
103106
cg cgroups.Cgroup
107+
cancel func()
104108
}
105109

106110
func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
@@ -300,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
300304
if err != nil {
301305
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
302306
}
303-
s.setCgroup(cg)
307+
s.cg = cg
304308
}
305309
s.task = process
306310
return &taskAPI.CreateTaskResponse{
@@ -311,16 +315,15 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
311315

312316
// Start a process
313317
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
314-
s.mu.Lock()
315-
defer s.mu.Unlock()
316318
p, err := s.getProcess(r.ExecID)
317319
if err != nil {
318320
return nil, err
319321
}
320322
if err := p.Start(ctx); err != nil {
321323
return nil, err
322324
}
323-
if s.cg == nil && p.Pid() > 0 {
325+
// case for restore
326+
if s.getCgroup() == nil && p.Pid() > 0 {
324327
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
325328
if err != nil {
326329
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
@@ -334,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
334337

335338
// Delete the initial process and container
336339
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
337-
s.mu.Lock()
338-
defer s.mu.Unlock()
339340
p, err := s.getProcess(r.ExecID)
340341
if err != nil {
341342
return nil, err
@@ -348,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
348349
}
349350
isTask := r.ExecID == ""
350351
if !isTask {
352+
s.mu.Lock()
351353
delete(s.processes, r.ExecID)
354+
s.mu.Unlock()
352355
}
353356
if isTask && s.platform != nil {
354357
s.platform.Close()
@@ -363,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
363366
// Exec an additional process inside the container
364367
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
365368
s.mu.Lock()
366-
defer s.mu.Unlock()
367-
if p := s.processes[r.ExecID]; p != nil {
369+
p := s.processes[r.ExecID]
370+
s.mu.Unlock()
371+
if p != nil {
368372
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
369373
}
370-
p := s.task
374+
p = s.task
371375
if p == nil {
372376
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
373377
}
@@ -382,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
382386
if err != nil {
383387
return nil, errdefs.ToGRPC(err)
384388
}
389+
s.mu.Lock()
385390
s.processes[r.ExecID] = process
391+
s.mu.Unlock()
386392
return empty, nil
387393
}
388394

389395
// ResizePty of a process
390396
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
391-
s.mu.Lock()
392-
defer s.mu.Unlock()
393397
p, err := s.getProcess(r.ExecID)
394398
if err != nil {
395399
return nil, err
@@ -406,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
406410

407411
// State returns runtime state information for a process
408412
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
409-
s.mu.Lock()
410-
defer s.mu.Unlock()
411413
p, err := s.getProcess(r.ExecID)
412414
if err != nil {
413415
return nil, err
@@ -447,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
447449
// Pause the container
448450
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
449451
s.mu.Lock()
450-
defer s.mu.Unlock()
451452
p := s.task
453+
s.mu.Unlock()
452454
if p == nil {
453455
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
454456
}
@@ -461,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
461463
// Resume the container
462464
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
463465
s.mu.Lock()
464-
defer s.mu.Unlock()
465466
p := s.task
467+
s.mu.Unlock()
466468
if p == nil {
467469
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
468470
}
@@ -474,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
474476

475477
// Kill a process with the provided signal
476478
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
477-
s.mu.Lock()
478-
defer s.mu.Unlock()
479479
p, err := s.getProcess(r.ExecID)
480480
if err != nil {
481481
return nil, err
@@ -522,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
522522

523523
// CloseIO of a process
524524
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
525-
s.mu.Lock()
526-
defer s.mu.Unlock()
527525
p, err := s.getProcess(r.ExecID)
528526
if err != nil {
529527
return nil, err
@@ -539,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
539537
// Checkpoint the container
540538
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
541539
s.mu.Lock()
542-
defer s.mu.Unlock()
543540
p := s.task
541+
s.mu.Unlock()
544542
if p == nil {
545543
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
546544
}
@@ -579,18 +577,17 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
579577
}
580578

581579
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
580+
s.cancel()
582581
os.Exit(0)
583582
return empty, nil
584583
}
585584

586585
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
587-
s.mu.Lock()
588-
defer s.mu.Unlock()
589-
590-
if s.cg == nil {
586+
cg := s.getCgroup()
587+
if cg == nil {
591588
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
592589
}
593-
stats, err := s.cg.Stat(cgroups.IgnoreNotExist)
590+
stats, err := cg.Stat(cgroups.IgnoreNotExist)
594591
if err != nil {
595592
return nil, err
596593
}
@@ -606,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
606603
// Update a running container
607604
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
608605
s.mu.Lock()
609-
defer s.mu.Unlock()
610606
p := s.task
607+
s.mu.Unlock()
611608
if p == nil {
612609
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
613610
}
@@ -619,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
619616

620617
// Wait for a process to exit
621618
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
622-
s.mu.Lock()
623619
p, err := s.getProcess(r.ExecID)
624-
s.mu.Unlock()
625620
if err != nil {
626621
return nil, err
627622
}
@@ -643,9 +638,6 @@ func (s *service) processExits() {
643638
}
644639

645640
func (s *service) checkProcesses(e runcC.Exit) {
646-
s.mu.Lock()
647-
defer s.mu.Unlock()
648-
649641
for _, p := range s.allProcesses() {
650642
if p.Pid() == e.Pid {
651643
if ip, ok := p.(*proc.Init); ok {
@@ -669,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) {
669661
}
670662

671663
func (s *service) allProcesses() (o []rproc.Process) {
664+
s.mu.Lock()
665+
defer s.mu.Unlock()
672666
for _, p := range s.processes {
673667
o = append(o, p)
674668
}
@@ -680,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) {
680674

681675
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
682676
s.mu.Lock()
683-
defer s.mu.Unlock()
684677
p := s.task
678+
s.mu.Unlock()
685679
if p == nil {
686680
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
687681
}
@@ -698,13 +692,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
698692

699693
func (s *service) forward(publisher events.Publisher) {
700694
for e := range s.events {
701-
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
695+
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
696+
err := publisher.Publish(ctx, getTopic(e), e)
697+
cancel()
698+
if err != nil {
702699
logrus.WithError(err).Error("post event")
703700
}
704701
}
705702
}
706703

707704
func (s *service) getProcess(execID string) (rproc.Process, error) {
705+
s.mu.Lock()
706+
defer s.mu.Unlock()
708707
if execID == "" {
709708
return s.task, nil
710709
}
@@ -715,14 +714,22 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
715714
return p, nil
716715
}
717716

717+
func (s *service) getCgroup() cgroups.Cgroup {
718+
s.mu.Lock()
719+
defer s.mu.Unlock()
720+
return s.cg
721+
}
722+
718723
func (s *service) setCgroup(cg cgroups.Cgroup) {
724+
s.mu.Lock()
719725
s.cg = cg
726+
s.mu.Unlock()
720727
if err := s.ep.add(s.id, cg); err != nil {
721728
logrus.WithError(err).Error("add cg to OOM monitor")
722729
}
723730
}
724731

725-
func getTopic(ctx context.Context, e interface{}) string {
732+
func getTopic(e interface{}) string {
726733
switch e.(type) {
727734
case *eventstypes.TaskCreate:
728735
return runtime.TaskCreateEventTopic

0 commit comments

Comments
 (0)