@@ -24,18 +24,21 @@ import (
2424 "path/filepath"
2525 "time"
2626
27+ eventstypes "github.com/containerd/containerd/api/events"
2728 "github.com/containerd/containerd/api/types"
2829 tasktypes "github.com/containerd/containerd/api/types/task"
2930 "github.com/containerd/containerd/errdefs"
3031 "github.com/containerd/containerd/events/exchange"
3132 "github.com/containerd/containerd/identifiers"
3233 "github.com/containerd/containerd/log"
34+ "github.com/containerd/containerd/namespaces"
3335 "github.com/containerd/containerd/runtime"
3436 client "github.com/containerd/containerd/runtime/v2/shim"
3537 "github.com/containerd/containerd/runtime/v2/task"
3638 "github.com/containerd/ttrpc"
3739 ptypes "github.com/gogo/protobuf/types"
3840 "github.com/pkg/errors"
41+ "github.com/sirupsen/logrus"
3942)
4043
4144func loadAddress (path string ) (string , error ) {
@@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) {
4649 return string (data ), nil
4750}
4851
49- func loadShim (ctx context.Context , bundle * Bundle , events * exchange.Exchange , rt * runtime.TaskList ) (_ * shim , err error ) {
52+ func loadShim (ctx context.Context , bundle * Bundle , events * exchange.Exchange , rt * runtime.TaskList , onClose func () ) (_ * shim , err error ) {
5053 address , err := loadAddress (filepath .Join (bundle .Path , "address" ))
5154 if err != nil {
5255 return nil , err
@@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
5558 if err != nil {
5659 return nil , err
5760 }
61+ defer func () {
62+ if err != nil {
63+ conn .Close ()
64+ }
65+ }()
5866 f , err := openShimLog (ctx , bundle )
5967 if err != nil {
6068 return nil , errors .Wrap (err , "open shim log pipe" )
@@ -74,7 +82,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
7482 }
7583 }()
7684
77- client := ttrpc .NewClient (conn , ttrpc .WithOnClose (func () { _ = conn .Close () }))
85+ client := ttrpc .NewClient (conn , ttrpc .WithOnClose (onClose ))
86+ defer func () {
87+ if err != nil {
88+ client .Close ()
89+ }
90+ }()
7891 s := & shim {
7992 client : client ,
8093 task : task .NewTaskClient (client ),
@@ -88,6 +101,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
88101 return s , nil
89102}
90103
104+ func cleanupAfterDeadShim (ctx context.Context , id , ns string , events * exchange.Exchange , binaryCall * binary ) {
105+ ctx = namespaces .WithNamespace (ctx , ns )
106+ ctx , cancel := context .WithTimeout (ctx , 5 * time .Second )
107+ defer cancel ()
108+
109+ log .G (ctx ).WithFields (logrus.Fields {
110+ "id" : id ,
111+ "namespace" : ns ,
112+ }).Warn ("cleaning up after shim disconnected" )
113+ response , err := binaryCall .Delete (ctx )
114+ if err != nil {
115+ log .G (ctx ).WithError (err ).WithFields (logrus.Fields {
116+ "id" : id ,
117+ "namespace" : ns ,
118+ }).Warn ("failed to clean up after shim disconnected" )
119+ }
120+
121+ var (
122+ pid uint32
123+ exitStatus uint32
124+ exitedAt time.Time
125+ )
126+ if response != nil {
127+ pid = response .Pid
128+ exitStatus = response .Status
129+ exitedAt = response .Timestamp
130+ } else {
131+ exitStatus = 255
132+ exitedAt = time .Now ()
133+ }
134+ events .Publish (ctx , runtime .TaskExitEventTopic , & eventstypes.TaskExit {
135+ ContainerID : id ,
136+ ID : id ,
137+ Pid : pid ,
138+ ExitStatus : exitStatus ,
139+ ExitedAt : exitedAt ,
140+ })
141+
142+ events .Publish (ctx , runtime .TaskDeleteEventTopic , & eventstypes.TaskDelete {
143+ ContainerID : id ,
144+ Pid : pid ,
145+ ExitStatus : exitStatus ,
146+ ExitedAt : exitedAt ,
147+ })
148+ }
149+
91150type shim struct {
92151 bundle * Bundle
93152 client * ttrpc.Client
@@ -119,19 +178,9 @@ func (s *shim) Shutdown(ctx context.Context) error {
119178}
120179
121180func (s * shim ) waitShutdown (ctx context.Context ) error {
122- dead := make (chan struct {})
123- go func () {
124- if err := s .Shutdown (ctx ); err != nil {
125- log .G (ctx ).WithError (err ).Error ("shim shutdown error" )
126- }
127- close (dead )
128- }()
129- select {
130- case <- time .After (3 * time .Second ):
131- return errors .New ("failed to shutdown shim in time" )
132- case <- dead :
133- return nil
134- }
181+ ctx , cancel := context .WithTimeout (ctx , 3 * time .Second )
182+ defer cancel ()
183+ return s .Shutdown (ctx )
135184}
136185
137186// ID of the shim/task
@@ -154,15 +203,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
154203 if err != nil {
155204 return nil , errdefs .FromGRPC (err )
156205 }
206+ // remove self from the runtime task list
207+ // this seems dirty but it cleans up the API across runtimes, tasks, and the service
208+ s .rtTasks .Delete (ctx , s .ID ())
157209 if err := s .waitShutdown (ctx ); err != nil {
158- return nil , err
210+ log . G ( ctx ). WithError ( err ). Error ( "failed to shutdown shim" )
159211 }
160212 if err := s .bundle .Delete (); err != nil {
161- return nil , err
213+ log . G ( ctx ). WithError ( err ). Error ( "failed to delete bundle" )
162214 }
163- // remove self from the runtime task list
164- // this seems dirty but it cleans up the API across runtimes, tasks, and the service
165- s .rtTasks .Delete (ctx , s .ID ())
166215 return & runtime.Exit {
167216 Status : response .ExitStatus ,
168217 Timestamp : response .ExitedAt ,
0 commit comments