@@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
68
68
if err != nil {
69
69
return nil , err
70
70
}
71
+ ctx , cancel := context .WithCancel (ctx )
71
72
go ep .run (ctx )
72
73
s := & service {
73
74
id : id ,
@@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
76
77
events : make (chan interface {}, 128 ),
77
78
ec : shim .Default .Subscribe (),
78
79
ep : ep ,
80
+ cancel : cancel ,
79
81
}
80
82
go s .processExits ()
81
83
runcC .Monitor = shim .Default
82
84
if err := s .initPlatform (); err != nil {
85
+ cancel ()
83
86
return nil , errors .Wrap (err , "failed to initialized platform behavior" )
84
87
}
85
88
go s .forward (publisher )
@@ -101,6 +104,7 @@ type service struct {
101
104
id string
102
105
bundle string
103
106
cg cgroups.Cgroup
107
+ cancel func ()
104
108
}
105
109
106
110
func newCommand (ctx context.Context , containerdBinary , containerdAddress string ) (* exec.Cmd , error ) {
@@ -300,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
300
304
if err != nil {
301
305
logrus .WithError (err ).Errorf ("loading cgroup for %d" , pid )
302
306
}
303
- s .setCgroup ( cg )
307
+ s .cg = cg
304
308
}
305
309
s .task = process
306
310
return & taskAPI.CreateTaskResponse {
@@ -311,16 +315,15 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
311
315
312
316
// Start a process
313
317
func (s * service ) Start (ctx context.Context , r * taskAPI.StartRequest ) (* taskAPI.StartResponse , error ) {
314
- s .mu .Lock ()
315
- defer s .mu .Unlock ()
316
318
p , err := s .getProcess (r .ExecID )
317
319
if err != nil {
318
320
return nil , err
319
321
}
320
322
if err := p .Start (ctx ); err != nil {
321
323
return nil , err
322
324
}
323
- if s .cg == nil && p .Pid () > 0 {
325
+ // case for restore
326
+ if s .getCgroup () == nil && p .Pid () > 0 {
324
327
cg , err := cgroups .Load (cgroups .V1 , cgroups .PidPath (p .Pid ()))
325
328
if err != nil {
326
329
logrus .WithError (err ).Errorf ("loading cgroup for %d" , p .Pid ())
@@ -334,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
334
337
335
338
// Delete the initial process and container
336
339
func (s * service ) Delete (ctx context.Context , r * taskAPI.DeleteRequest ) (* taskAPI.DeleteResponse , error ) {
337
- s .mu .Lock ()
338
- defer s .mu .Unlock ()
339
340
p , err := s .getProcess (r .ExecID )
340
341
if err != nil {
341
342
return nil , err
@@ -348,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
348
349
}
349
350
isTask := r .ExecID == ""
350
351
if ! isTask {
352
+ s .mu .Lock ()
351
353
delete (s .processes , r .ExecID )
354
+ s .mu .Unlock ()
352
355
}
353
356
if isTask && s .platform != nil {
354
357
s .platform .Close ()
@@ -363,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
363
366
// Exec an additional process inside the container
364
367
func (s * service ) Exec (ctx context.Context , r * taskAPI.ExecProcessRequest ) (* ptypes.Empty , error ) {
365
368
s .mu .Lock ()
366
- defer s .mu .Unlock ()
367
- if p := s .processes [r .ExecID ]; p != nil {
369
+ p := s .processes [r .ExecID ]
370
+ s .mu .Unlock ()
371
+ if p != nil {
368
372
return nil , errdefs .ToGRPCf (errdefs .ErrAlreadyExists , "id %s" , r .ExecID )
369
373
}
370
- p : = s .task
374
+ p = s .task
371
375
if p == nil {
372
376
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
373
377
}
@@ -382,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
382
386
if err != nil {
383
387
return nil , errdefs .ToGRPC (err )
384
388
}
389
+ s .mu .Lock ()
385
390
s .processes [r .ExecID ] = process
391
+ s .mu .Unlock ()
386
392
return empty , nil
387
393
}
388
394
389
395
// ResizePty of a process
390
396
func (s * service ) ResizePty (ctx context.Context , r * taskAPI.ResizePtyRequest ) (* ptypes.Empty , error ) {
391
- s .mu .Lock ()
392
- defer s .mu .Unlock ()
393
397
p , err := s .getProcess (r .ExecID )
394
398
if err != nil {
395
399
return nil , err
@@ -406,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
406
410
407
411
// State returns runtime state information for a process
408
412
func (s * service ) State (ctx context.Context , r * taskAPI.StateRequest ) (* taskAPI.StateResponse , error ) {
409
- s .mu .Lock ()
410
- defer s .mu .Unlock ()
411
413
p , err := s .getProcess (r .ExecID )
412
414
if err != nil {
413
415
return nil , err
@@ -447,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
447
449
// Pause the container
448
450
func (s * service ) Pause (ctx context.Context , r * taskAPI.PauseRequest ) (* ptypes.Empty , error ) {
449
451
s .mu .Lock ()
450
- defer s .mu .Unlock ()
451
452
p := s .task
453
+ s .mu .Unlock ()
452
454
if p == nil {
453
455
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
454
456
}
@@ -461,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
461
463
// Resume the container
462
464
func (s * service ) Resume (ctx context.Context , r * taskAPI.ResumeRequest ) (* ptypes.Empty , error ) {
463
465
s .mu .Lock ()
464
- defer s .mu .Unlock ()
465
466
p := s .task
467
+ s .mu .Unlock ()
466
468
if p == nil {
467
469
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
468
470
}
@@ -474,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
474
476
475
477
// Kill a process with the provided signal
476
478
func (s * service ) Kill (ctx context.Context , r * taskAPI.KillRequest ) (* ptypes.Empty , error ) {
477
- s .mu .Lock ()
478
- defer s .mu .Unlock ()
479
479
p , err := s .getProcess (r .ExecID )
480
480
if err != nil {
481
481
return nil , err
@@ -522,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
522
522
523
523
// CloseIO of a process
524
524
func (s * service ) CloseIO (ctx context.Context , r * taskAPI.CloseIORequest ) (* ptypes.Empty , error ) {
525
- s .mu .Lock ()
526
- defer s .mu .Unlock ()
527
525
p , err := s .getProcess (r .ExecID )
528
526
if err != nil {
529
527
return nil , err
@@ -539,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
539
537
// Checkpoint the container
540
538
func (s * service ) Checkpoint (ctx context.Context , r * taskAPI.CheckpointTaskRequest ) (* ptypes.Empty , error ) {
541
539
s .mu .Lock ()
542
- defer s .mu .Unlock ()
543
540
p := s .task
541
+ s .mu .Unlock ()
544
542
if p == nil {
545
543
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
546
544
}
@@ -579,18 +577,17 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
579
577
}
580
578
581
579
func (s * service ) Shutdown (ctx context.Context , r * taskAPI.ShutdownRequest ) (* ptypes.Empty , error ) {
580
+ s .cancel ()
582
581
os .Exit (0 )
583
582
return empty , nil
584
583
}
585
584
586
585
func (s * service ) Stats (ctx context.Context , r * taskAPI.StatsRequest ) (* taskAPI.StatsResponse , error ) {
587
- s .mu .Lock ()
588
- defer s .mu .Unlock ()
589
-
590
- if s .cg == nil {
586
+ cg := s .getCgroup ()
587
+ if cg == nil {
591
588
return nil , errdefs .ToGRPCf (errdefs .ErrNotFound , "cgroup does not exist" )
592
589
}
593
- stats , err := s . cg .Stat (cgroups .IgnoreNotExist )
590
+ stats , err := cg .Stat (cgroups .IgnoreNotExist )
594
591
if err != nil {
595
592
return nil , err
596
593
}
@@ -606,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
606
603
// Update a running container
607
604
func (s * service ) Update (ctx context.Context , r * taskAPI.UpdateTaskRequest ) (* ptypes.Empty , error ) {
608
605
s .mu .Lock ()
609
- defer s .mu .Unlock ()
610
606
p := s .task
607
+ s .mu .Unlock ()
611
608
if p == nil {
612
609
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
613
610
}
@@ -619,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
619
616
620
617
// Wait for a process to exit
621
618
func (s * service ) Wait (ctx context.Context , r * taskAPI.WaitRequest ) (* taskAPI.WaitResponse , error ) {
622
- s .mu .Lock ()
623
619
p , err := s .getProcess (r .ExecID )
624
- s .mu .Unlock ()
625
620
if err != nil {
626
621
return nil , err
627
622
}
@@ -643,9 +638,6 @@ func (s *service) processExits() {
643
638
}
644
639
645
640
func (s * service ) checkProcesses (e runcC.Exit ) {
646
- s .mu .Lock ()
647
- defer s .mu .Unlock ()
648
-
649
641
for _ , p := range s .allProcesses () {
650
642
if p .Pid () == e .Pid {
651
643
if ip , ok := p .(* proc.Init ); ok {
@@ -669,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) {
669
661
}
670
662
671
663
func (s * service ) allProcesses () (o []rproc.Process ) {
664
+ s .mu .Lock ()
665
+ defer s .mu .Unlock ()
672
666
for _ , p := range s .processes {
673
667
o = append (o , p )
674
668
}
@@ -680,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) {
680
674
681
675
func (s * service ) getContainerPids (ctx context.Context , id string ) ([]uint32 , error ) {
682
676
s .mu .Lock ()
683
- defer s .mu .Unlock ()
684
677
p := s .task
678
+ s .mu .Unlock ()
685
679
if p == nil {
686
680
return nil , errors .Wrapf (errdefs .ErrFailedPrecondition , "container must be created" )
687
681
}
@@ -698,13 +692,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
698
692
699
693
func (s * service ) forward (publisher events.Publisher ) {
700
694
for e := range s .events {
701
- if err := publisher .Publish (s .context , getTopic (s .context , e ), e ); err != nil {
695
+ ctx , cancel := context .WithTimeout (s .context , 5 * time .Second )
696
+ err := publisher .Publish (ctx , getTopic (e ), e )
697
+ cancel ()
698
+ if err != nil {
702
699
logrus .WithError (err ).Error ("post event" )
703
700
}
704
701
}
705
702
}
706
703
707
704
func (s * service ) getProcess (execID string ) (rproc.Process , error ) {
705
+ s .mu .Lock ()
706
+ defer s .mu .Unlock ()
708
707
if execID == "" {
709
708
return s .task , nil
710
709
}
@@ -715,14 +714,22 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
715
714
return p , nil
716
715
}
717
716
717
+ func (s * service ) getCgroup () cgroups.Cgroup {
718
+ s .mu .Lock ()
719
+ defer s .mu .Unlock ()
720
+ return s .cg
721
+ }
722
+
718
723
func (s * service ) setCgroup (cg cgroups.Cgroup ) {
724
+ s .mu .Lock ()
719
725
s .cg = cg
726
+ s .mu .Unlock ()
720
727
if err := s .ep .add (s .id , cg ); err != nil {
721
728
logrus .WithError (err ).Error ("add cg to OOM monitor" )
722
729
}
723
730
}
724
731
725
- func getTopic (ctx context. Context , e interface {}) string {
732
+ func getTopic (e interface {}) string {
726
733
switch e .(type ) {
727
734
case * eventstypes.TaskCreate :
728
735
return runtime .TaskCreateEventTopic
0 commit comments