@@ -30,6 +30,7 @@ import (
3030 "github.com/containerd/containerd/events"
3131 "github.com/containerd/containerd/log"
3232 "github.com/containerd/containerd/namespaces"
33+ "github.com/containerd/containerd/plugin"
3334 shimapi "github.com/containerd/containerd/runtime/v2/task"
3435 "github.com/containerd/containerd/version"
3536 "github.com/containerd/ttrpc"
@@ -38,13 +39,6 @@ import (
3839 "github.com/sirupsen/logrus"
3940)
4041
41- // Client for a shim server
42- type Client struct {
43- service shimapi.TaskService
44- context context.Context
45- signals chan os.Signal
46- }
47-
4842// Publisher for events
4943type Publisher interface {
5044 events.Publisher
@@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
6458
6559// Shim server interface
6660type Shim interface {
67- shimapi.TaskService
6861 Cleanup (ctx context.Context ) (* shimapi.DeleteResponse , error )
6962 StartShim (ctx context.Context , opts StartOpts ) (string , error )
7063}
@@ -91,6 +84,19 @@ type Config struct {
9184 NoSetupLogger bool
9285}
9386
87+ type ttrpcService interface {
88+ RegisterTTRPC (* ttrpc.Server ) error
89+ }
90+
91+ type taskService struct {
92+ local shimapi.TaskService
93+ }
94+
95+ func (t * taskService ) RegisterTTRPC (server * ttrpc.Server ) error {
96+ shimapi .RegisterTaskService (server , t .local )
97+ return nil
98+ }
99+
94100var (
95101 debugFlag bool
96102 versionFlag bool
@@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) {
158164 for _ , o := range opts {
159165 o (& config )
160166 }
167+
161168 if err := run (id , initFunc , config ); err != nil {
162169 fmt .Fprintf (os .Stderr , "%s: %s\n " , id , err )
163170 os .Exit (1 )
@@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error {
208215 return err
209216 }
210217
218+ // Handle explicit actions
211219 switch action {
212220 case "delete" :
213221 logger := logrus .WithFields (logrus.Fields {
@@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error {
234242 Address : addressFlag ,
235243 TTRPCAddress : ttrpcAddress ,
236244 }
245+
237246 address , err := service .StartShim (ctx , opts )
238247 if err != nil {
239248 return err
@@ -242,64 +251,141 @@ func run(id string, initFunc Init, config Config) error {
242251 return err
243252 }
244253 return nil
245- default :
246- if ! config .NoSetupLogger {
247- if err := setLogger (ctx , idFlag ); err != nil {
248- return err
249- }
254+ }
255+
256+ if ! config .NoSetupLogger {
257+ if err := setLogger (ctx , idFlag ); err != nil {
258+ return err
259+ }
260+ }
261+
262+ // Register event plugin
263+ plugin .Register (& plugin.Registration {
264+ Type : plugin .EventPlugin ,
265+ ID : "publisher" ,
266+ InitFn : func (ic * plugin.InitContext ) (interface {}, error ) {
267+ return publisher , nil
268+ },
269+ })
270+
271+ // If service is an implementation of the task service, register it as a plugin
272+ if ts , ok := service .(shimapi.TaskService ); ok {
273+ plugin .Register (& plugin.Registration {
274+ Type : plugin .TTRPCPlugin ,
275+ ID : "task" ,
276+ InitFn : func (ic * plugin.InitContext ) (interface {}, error ) {
277+ return & taskService {ts }, nil
278+ },
279+ })
280+ }
281+
282+ var (
283+ initialized = plugin .NewPluginSet ()
284+ ttrpcServices = []ttrpcService {}
285+ )
286+ plugins := plugin .Graph (func (* plugin.Registration ) bool { return false })
287+ for _ , p := range plugins {
288+ id := p .URI ()
289+ log .G (ctx ).WithField ("type" , p .Type ).Infof ("loading plugin %q..." , id )
290+
291+ initContext := plugin .NewContext (
292+ ctx ,
293+ p ,
294+ initialized ,
295+ // NOTE: Root is empty since the shim does not support persistent storage,
296+ // shim plugins should make use state directory for writing files to disk.
297+ // The state directory will be destroyed when the shim if cleaned up or
298+ // on reboot
299+ "" ,
300+ bundlePath ,
301+ )
302+ initContext .Address = addressFlag
303+ initContext .TTRPCAddress = ttrpcAddress
304+
305+ // load the plugin specific configuration if it is provided
306+ //TODO: Read configuration passed into shim, or from state directory?
307+ //if p.Config != nil {
308+ // pc, err := config.Decode(p)
309+ // if err != nil {
310+ // return nil, err
311+ // }
312+ // initContext.Config = pc
313+ //}
314+
315+ result := p .Init (initContext )
316+ if err := initialized .Add (result ); err != nil {
317+ return errors .Wrapf (err , "could not add plugin result to plugin set" )
250318 }
251- client := NewShimClient (ctx , service , signals )
252- if err := client .Serve (); err != nil {
253- if err != context .Canceled {
254- return err
319+
320+ instance , err := result .Instance ()
321+ if err != nil {
322+ if plugin .IsSkipPlugin (err ) {
323+ log .G (ctx ).WithError (err ).WithField ("type" , p .Type ).Infof ("skip loading plugin %q..." , id )
324+ } else {
325+ log .G (ctx ).WithError (err ).Warnf ("failed to load plugin %s" , id )
255326 }
327+ continue
256328 }
257329
258- // NOTE: If the shim server is down(like oom killer), the address
259- // socket might be leaking.
260- if address , err := ReadAddress ("address" ); err == nil {
261- _ = RemoveSocket (address )
330+ if src , ok := instance .(ttrpcService ); ok {
331+ logrus .WithField ("id" , id ).Debug ("registering ttrpc service" )
332+ ttrpcServices = append (ttrpcServices , src )
262333 }
334+ }
335+
336+ server , err := newServer ()
337+ if err != nil {
338+ return errors .Wrap (err , "failed creating server" )
339+ }
263340
264- select {
265- case <- publisher .Done ():
266- return nil
267- case <- time .After (5 * time .Second ):
268- return errors .New ("publisher not closed" )
341+ for _ , srv := range ttrpcServices {
342+ if err := srv .RegisterTTRPC (server ); err != nil {
343+ return errors .Wrap (err , "failed to register service" )
269344 }
270345 }
271- }
272346
273- // NewShimClient creates a new shim server client
274- func NewShimClient (ctx context.Context , svc shimapi.TaskService , signals chan os.Signal ) * Client {
275- s := & Client {
276- service : svc ,
277- context : ctx ,
278- signals : signals ,
347+ if err := serve (ctx , server , signals ); err != nil {
348+ if err != context .Canceled {
349+ return err
350+ }
351+ }
352+
353+ // NOTE: If the shim server is down(like oom killer), the address
354+ // socket might be leaking.
355+ if address , err := ReadAddress ("address" ); err == nil {
356+ _ = RemoveSocket (address )
357+ }
358+
359+ select {
360+ case <- publisher .Done ():
361+ return nil
362+ case <- time .After (5 * time .Second ):
363+ return errors .New ("publisher not closed" )
279364 }
280- return s
281365}
282366
283- // Serve the shim server
284- func (s * Client ) Serve () error {
367+ // serve serves the ttrpc API over a unix socket in the current working directory
368+ // and blocks until the context is canceled
369+ func serve (ctx context.Context , server * ttrpc.Server , signals chan os.Signal ) error {
285370 dump := make (chan os.Signal , 32 )
286371 setupDumpStacks (dump )
287372
288373 path , err := os .Getwd ()
289374 if err != nil {
290375 return err
291376 }
292- server , err := newServer ()
293- if err != nil {
294- return errors .Wrap (err , "failed creating server" )
295- }
296377
297- logrus .Debug ("registering ttrpc server" )
298- shimapi .RegisterTaskService (server , s .service )
299-
300- if err := serve (s .context , server , socketFlag ); err != nil {
378+ l , err := serveListener (socketFlag )
379+ if err != nil {
301380 return err
302381 }
382+ go func () {
383+ defer l .Close ()
384+ if err := server .Serve (ctx , l ); err != nil &&
385+ ! strings .Contains (err .Error (), "use of closed network connection" ) {
386+ logrus .WithError (err ).Fatal ("containerd-shim: ttrpc server failure" )
387+ }
388+ }()
303389 logger := logrus .WithFields (logrus.Fields {
304390 "pid" : os .Getpid (),
305391 "path" : path ,
@@ -310,24 +396,7 @@ func (s *Client) Serve() error {
310396 dumpStacks (logger )
311397 }
312398 }()
313- return handleSignals (s .context , logger , s .signals )
314- }
315-
316- // serve serves the ttrpc API over a unix socket at the provided path
317- // this function does not block
318- func serve (ctx context.Context , server * ttrpc.Server , path string ) error {
319- l , err := serveListener (path )
320- if err != nil {
321- return err
322- }
323- go func () {
324- defer l .Close ()
325- if err := server .Serve (ctx , l ); err != nil &&
326- ! strings .Contains (err .Error (), "use of closed network connection" ) {
327- logrus .WithError (err ).Fatal ("containerd-shim: ttrpc server failure" )
328- }
329- }()
330- return nil
399+ return handleSignals (ctx , logger , signals )
331400}
332401
333402func dumpStacks (logger * logrus.Entry ) {
0 commit comments