@@ -24,18 +24,21 @@ import (
2424 "path/filepath"
2525 "time"
2626
27+ eventstypes "github.com/containerd/containerd/api/events"
2728 "github.com/containerd/containerd/api/types"
2829 tasktypes "github.com/containerd/containerd/api/types/task"
2930 "github.com/containerd/containerd/errdefs"
3031 "github.com/containerd/containerd/events/exchange"
3132 "github.com/containerd/containerd/identifiers"
3233 "github.com/containerd/containerd/log"
34+ "github.com/containerd/containerd/namespaces"
3335 "github.com/containerd/containerd/runtime"
3436 client "github.com/containerd/containerd/runtime/v2/shim"
3537 "github.com/containerd/containerd/runtime/v2/task"
3638 "github.com/containerd/ttrpc"
3739 ptypes "github.com/gogo/protobuf/types"
3840 "github.com/pkg/errors"
41+ "github.com/sirupsen/logrus"
3942)
4043
4144func loadAddress (path string ) (string , error ) {
@@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) {
4649 return string (data ), nil
4750}
4851
49- func loadShim (ctx context.Context , bundle * Bundle , events * exchange.Exchange , rt * runtime.TaskList ) (_ * shim , err error ) {
52+ func loadShim (ctx context.Context , bundle * Bundle , events * exchange.Exchange , rt * runtime.TaskList , onClose func () ) (_ * shim , err error ) {
5053 address , err := loadAddress (filepath .Join (bundle .Path , "address" ))
5154 if err != nil {
5255 return nil , err
@@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
5558 if err != nil {
5659 return nil , err
5760 }
61+ defer func () {
62+ if err != nil {
63+ conn .Close ()
64+ }
65+ }()
5866 f , err := openShimLog (ctx , bundle )
5967 if err != nil {
6068 return nil , errors .Wrap (err , "open shim log pipe" )
@@ -79,7 +87,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
7987 }
8088 }()
8189
82- client := ttrpc .NewClient (conn , ttrpc .WithOnClose (func () { _ = conn .Close () }))
90+ client := ttrpc .NewClient (conn , ttrpc .WithOnClose (onClose ))
91+ defer func () {
92+ if err != nil {
93+ client .Close ()
94+ }
95+ }()
8396 s := & shim {
8497 client : client ,
8598 task : task .NewTaskClient (client ),
@@ -93,6 +106,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
93106 return s , nil
94107}
95108
109+ func cleanupAfterDeadShim (ctx context.Context , id , ns string , events * exchange.Exchange , binaryCall * binary ) {
110+ ctx = namespaces .WithNamespace (ctx , ns )
111+ ctx , cancel := context .WithTimeout (ctx , 5 * time .Second )
112+ defer cancel ()
113+
114+ log .G (ctx ).WithFields (logrus.Fields {
115+ "id" : id ,
116+ "namespace" : ns ,
117+ }).Warn ("cleaning up after shim disconnected" )
118+ response , err := binaryCall .Delete (ctx )
119+ if err != nil {
120+ log .G (ctx ).WithError (err ).WithFields (logrus.Fields {
121+ "id" : id ,
122+ "namespace" : ns ,
123+ }).Warn ("failed to clean up after shim disconnected" )
124+ }
125+
126+ var (
127+ pid uint32
128+ exitStatus uint32
129+ exitedAt time.Time
130+ )
131+ if response != nil {
132+ pid = response .Pid
133+ exitStatus = response .Status
134+ exitedAt = response .Timestamp
135+ } else {
136+ exitStatus = 255
137+ exitedAt = time .Now ()
138+ }
139+ events .Publish (ctx , runtime .TaskExitEventTopic , & eventstypes.TaskExit {
140+ ContainerID : id ,
141+ ID : id ,
142+ Pid : pid ,
143+ ExitStatus : exitStatus ,
144+ ExitedAt : exitedAt ,
145+ })
146+
147+ events .Publish (ctx , runtime .TaskDeleteEventTopic , & eventstypes.TaskDelete {
148+ ContainerID : id ,
149+ Pid : pid ,
150+ ExitStatus : exitStatus ,
151+ ExitedAt : exitedAt ,
152+ })
153+ }
154+
96155type shim struct {
97156 bundle * Bundle
98157 client * ttrpc.Client
@@ -124,19 +183,9 @@ func (s *shim) Shutdown(ctx context.Context) error {
124183}
125184
126185func (s * shim ) waitShutdown (ctx context.Context ) error {
127- dead := make (chan struct {})
128- go func () {
129- if err := s .Shutdown (ctx ); err != nil {
130- log .G (ctx ).WithError (err ).Error ("shim shutdown error" )
131- }
132- close (dead )
133- }()
134- select {
135- case <- time .After (3 * time .Second ):
136- return errors .New ("failed to shutdown shim in time" )
137- case <- dead :
138- return nil
139- }
186+ ctx , cancel := context .WithTimeout (ctx , 3 * time .Second )
187+ defer cancel ()
188+ return s .Shutdown (ctx )
140189}
141190
142191// ID of the shim/task
@@ -159,15 +208,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
159208 if err != nil && ! errdefs .IsNotFound (err ) {
160209 return nil , errdefs .FromGRPC (err )
161210 }
211+ // remove self from the runtime task list
212+ // this seems dirty but it cleans up the API across runtimes, tasks, and the service
213+ s .rtTasks .Delete (ctx , s .ID ())
162214 if err := s .waitShutdown (ctx ); err != nil {
163- return nil , err
215+ log . G ( ctx ). WithError ( err ). Error ( "failed to shutdown shim" )
164216 }
165217 if err := s .bundle .Delete (); err != nil {
166- return nil , err
218+ log . G ( ctx ). WithError ( err ). Error ( "failed to delete bundle" )
167219 }
168- // remove self from the runtime task list
169- // this seems dirty but it cleans up the API across runtimes, tasks, and the service
170- s .rtTasks .Delete (ctx , s .ID ())
171220 return & runtime.Exit {
172221 Status : response .ExitStatus ,
173222 Timestamp : response .ExitedAt ,
0 commit comments