@@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
6868 if err != nil {
6969 return nil , err
7070 }
71+ ctx , cancel := context .WithCancel (ctx )
7172 go ep .run (ctx )
7273 s := & service {
7374 id : id ,
@@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
7677 events : make (chan interface {}, 128 ),
7778 ec : shim .Default .Subscribe (),
7879 ep : ep ,
80+ cancel : cancel ,
7981 }
8082 go s .processExits ()
8183 runcC .Monitor = shim .Default
8284 if err := s .initPlatform (); err != nil {
85+ cancel ()
8386 return nil , errors .Wrap (err , "failed to initialized platform behavior" )
8487 }
8588 go s .forward (publisher )
@@ -101,6 +104,7 @@ type service struct {
101104 id string
102105 bundle string
103106 cg cgroups.Cgroup
107+ cancel func ()
104108}
105109
106110func newCommand (ctx context.Context , containerdBinary , containerdAddress string ) (* exec.Cmd , error ) {
@@ -300,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
300304 if err != nil {
301305 logrus .WithError (err ).Errorf ("loading cgroup for %d" , pid )
302306 }
303- s .setCgroup ( cg )
307+ s .cg = cg
304308 }
305309 s .task = process
306310 return & taskAPI.CreateTaskResponse {
@@ -311,16 +315,15 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
311315
312316// Start a process
313317func (s * service ) Start (ctx context.Context , r * taskAPI.StartRequest ) (* taskAPI.StartResponse , error ) {
314- s .mu .Lock ()
315- defer s .mu .Unlock ()
316318 p , err := s .getProcess (r .ExecID )
317319 if err != nil {
318320 return nil , err
319321 }
320322 if err := p .Start (ctx ); err != nil {
321323 return nil , err
322324 }
323- if s .cg == nil && p .Pid () > 0 {
325+ // case for restore
326+ if s .getCgroup () == nil && p .Pid () > 0 {
324327 cg , err := cgroups .Load (cgroups .V1 , cgroups .PidPath (p .Pid ()))
325328 if err != nil {
326329 logrus .WithError (err ).Errorf ("loading cgroup for %d" , p .Pid ())
@@ -334,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
334337
335338// Delete the initial process and container
336339func (s * service ) Delete (ctx context.Context , r * taskAPI.DeleteRequest ) (* taskAPI.DeleteResponse , error ) {
337- s .mu .Lock ()
338- defer s .mu .Unlock ()
339340 p , err := s .getProcess (r .ExecID )
340341 if err != nil {
341342 return nil , err
@@ -348,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
348349 }
349350 isTask := r .ExecID == ""
350351 if ! isTask {
352+ s .mu .Lock ()
351353 delete (s .processes , r .ExecID )
354+ s .mu .Unlock ()
352355 }
353356 if isTask && s .platform != nil {
354357 s .platform .Close ()
@@ -363,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
363366// Exec an additional process inside the container
364367func (s * service ) Exec (ctx context.Context , r * taskAPI.ExecProcessRequest ) (* ptypes.Empty , error ) {
365368 s .mu .Lock ()
366- defer s .mu .Unlock ()
367- if p := s .processes [r .ExecID ]; p != nil {
369+ p := s .processes [r .ExecID ]
370+ s .mu .Unlock ()
371+ if p != nil {
368372 return nil , errdefs .ToGRPCf (errdefs .ErrAlreadyExists , "id %s" , r .ExecID )
369373 }
370- p : = s .task
374+ p = s .task
371375 if p == nil {
372376 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
373377 }
@@ -382,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
382386 if err != nil {
383387 return nil , errdefs .ToGRPC (err )
384388 }
389+ s .mu .Lock ()
385390 s .processes [r .ExecID ] = process
391+ s .mu .Unlock ()
386392 return empty , nil
387393}
388394
389395// ResizePty of a process
390396func (s * service ) ResizePty (ctx context.Context , r * taskAPI.ResizePtyRequest ) (* ptypes.Empty , error ) {
391- s .mu .Lock ()
392- defer s .mu .Unlock ()
393397 p , err := s .getProcess (r .ExecID )
394398 if err != nil {
395399 return nil , err
@@ -406,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
406410
407411// State returns runtime state information for a process
408412func (s * service ) State (ctx context.Context , r * taskAPI.StateRequest ) (* taskAPI.StateResponse , error ) {
409- s .mu .Lock ()
410- defer s .mu .Unlock ()
411413 p , err := s .getProcess (r .ExecID )
412414 if err != nil {
413415 return nil , err
@@ -447,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
447449// Pause the container
448450func (s * service ) Pause (ctx context.Context , r * taskAPI.PauseRequest ) (* ptypes.Empty , error ) {
449451 s .mu .Lock ()
450- defer s .mu .Unlock ()
451452 p := s .task
453+ s .mu .Unlock ()
452454 if p == nil {
453455 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
454456 }
@@ -461,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
461463// Resume the container
462464func (s * service ) Resume (ctx context.Context , r * taskAPI.ResumeRequest ) (* ptypes.Empty , error ) {
463465 s .mu .Lock ()
464- defer s .mu .Unlock ()
465466 p := s .task
467+ s .mu .Unlock ()
466468 if p == nil {
467469 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
468470 }
@@ -474,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
474476
475477// Kill a process with the provided signal
476478func (s * service ) Kill (ctx context.Context , r * taskAPI.KillRequest ) (* ptypes.Empty , error ) {
477- s .mu .Lock ()
478- defer s .mu .Unlock ()
479479 p , err := s .getProcess (r .ExecID )
480480 if err != nil {
481481 return nil , err
@@ -522,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
522522
523523// CloseIO of a process
524524func (s * service ) CloseIO (ctx context.Context , r * taskAPI.CloseIORequest ) (* ptypes.Empty , error ) {
525- s .mu .Lock ()
526- defer s .mu .Unlock ()
527525 p , err := s .getProcess (r .ExecID )
528526 if err != nil {
529527 return nil , err
@@ -539,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
539537// Checkpoint the container
540538func (s * service ) Checkpoint (ctx context.Context , r * taskAPI.CheckpointTaskRequest ) (* ptypes.Empty , error ) {
541539 s .mu .Lock ()
542- defer s .mu .Unlock ()
543540 p := s .task
541+ s .mu .Unlock ()
544542 if p == nil {
545543 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
546544 }
@@ -579,18 +577,17 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
579577}
580578
581579func (s * service ) Shutdown (ctx context.Context , r * taskAPI.ShutdownRequest ) (* ptypes.Empty , error ) {
580+ s .cancel ()
582581 os .Exit (0 )
583582 return empty , nil
584583}
585584
586585func (s * service ) Stats (ctx context.Context , r * taskAPI.StatsRequest ) (* taskAPI.StatsResponse , error ) {
587- s .mu .Lock ()
588- defer s .mu .Unlock ()
589-
590- if s .cg == nil {
586+ cg := s .getCgroup ()
587+ if cg == nil {
591588 return nil , errdefs .ToGRPCf (errdefs .ErrNotFound , "cgroup does not exist" )
592589 }
593- stats , err := s . cg .Stat (cgroups .IgnoreNotExist )
590+ stats , err := cg .Stat (cgroups .IgnoreNotExist )
594591 if err != nil {
595592 return nil , err
596593 }
@@ -606,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
606603// Update a running container
607604func (s * service ) Update (ctx context.Context , r * taskAPI.UpdateTaskRequest ) (* ptypes.Empty , error ) {
608605 s .mu .Lock ()
609- defer s .mu .Unlock ()
610606 p := s .task
607+ s .mu .Unlock ()
611608 if p == nil {
612609 return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container must be created" )
613610 }
@@ -619,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
619616
620617// Wait for a process to exit
621618func (s * service ) Wait (ctx context.Context , r * taskAPI.WaitRequest ) (* taskAPI.WaitResponse , error ) {
622- s .mu .Lock ()
623619 p , err := s .getProcess (r .ExecID )
624- s .mu .Unlock ()
625620 if err != nil {
626621 return nil , err
627622 }
@@ -643,9 +638,6 @@ func (s *service) processExits() {
643638}
644639
645640func (s * service ) checkProcesses (e runcC.Exit ) {
646- s .mu .Lock ()
647- defer s .mu .Unlock ()
648-
649641 for _ , p := range s .allProcesses () {
650642 if p .Pid () == e .Pid {
651643 if ip , ok := p .(* proc.Init ); ok {
@@ -669,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) {
669661}
670662
671663func (s * service ) allProcesses () (o []rproc.Process ) {
664+ s .mu .Lock ()
665+ defer s .mu .Unlock ()
672666 for _ , p := range s .processes {
673667 o = append (o , p )
674668 }
@@ -680,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) {
680674
681675func (s * service ) getContainerPids (ctx context.Context , id string ) ([]uint32 , error ) {
682676 s .mu .Lock ()
683- defer s .mu .Unlock ()
684677 p := s .task
678+ s .mu .Unlock ()
685679 if p == nil {
686680 return nil , errors .Wrapf (errdefs .ErrFailedPrecondition , "container must be created" )
687681 }
@@ -698,13 +692,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
698692
699693func (s * service ) forward (publisher events.Publisher ) {
700694 for e := range s .events {
701- if err := publisher .Publish (s .context , getTopic (s .context , e ), e ); err != nil {
695+ ctx , cancel := context .WithTimeout (s .context , 5 * time .Second )
696+ err := publisher .Publish (ctx , getTopic (e ), e )
697+ cancel ()
698+ if err != nil {
702699 logrus .WithError (err ).Error ("post event" )
703700 }
704701 }
705702}
706703
707704func (s * service ) getProcess (execID string ) (rproc.Process , error ) {
705+ s .mu .Lock ()
706+ defer s .mu .Unlock ()
708707 if execID == "" {
709708 return s .task , nil
710709 }
@@ -715,14 +714,22 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
715714 return p , nil
716715}
717716
717+ func (s * service ) getCgroup () cgroups.Cgroup {
718+ s .mu .Lock ()
719+ defer s .mu .Unlock ()
720+ return s .cg
721+ }
722+
718723func (s * service ) setCgroup (cg cgroups.Cgroup ) {
724+ s .mu .Lock ()
719725 s .cg = cg
726+ s .mu .Unlock ()
720727 if err := s .ep .add (s .id , cg ); err != nil {
721728 logrus .WithError (err ).Error ("add cg to OOM monitor" )
722729 }
723730}
724731
725- func getTopic (ctx context. Context , e interface {}) string {
732+ func getTopic (e interface {}) string {
726733 switch e .(type ) {
727734 case * eventstypes.TaskCreate :
728735 return runtime .TaskCreateEventTopic
0 commit comments