Skip to content

Commit 8d135d2

Browse files
committed
Add support for shim plugins
Refactor shim v2 to load and register plugins. Update init shim interface to not require task service implementation on returned service, but register as plugin if it is. Signed-off-by: Derek McGowan <[email protected]>
1 parent fda782a commit 8d135d2

4 files changed

Lines changed: 156 additions & 86 deletions

File tree

pkg/cri/server/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type CRIService interface {
6565
Run() error
6666
// io.Closer is used by containerd to gracefully stop cri service.
6767
io.Closer
68-
plugin.Service
68+
Register(*grpc.Server) error
6969
grpcServices
7070
}
7171

plugin/plugin.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ import (
2020
"fmt"
2121
"sync"
2222

23-
"github.com/containerd/ttrpc"
2423
"github.com/pkg/errors"
25-
"google.golang.org/grpc"
2624
)
2725

2826
var (
@@ -63,6 +61,8 @@ const (
6361
ServicePlugin Type = "io.containerd.service.v1"
6462
// GRPCPlugin implements a grpc service
6563
GRPCPlugin Type = "io.containerd.grpc.v1"
64+
// TTRPCPlugin implements a ttrpc shim service
65+
TTRPCPlugin Type = "io.containerd.ttrpc.v1"
6666
// SnapshotPlugin implements a snapshotter
6767
SnapshotPlugin Type = "io.containerd.snapshotter.v1"
6868
// TaskMonitorPlugin implements a task monitor
@@ -124,21 +124,6 @@ func (r *Registration) URI() string {
124124
return fmt.Sprintf("%s.%s", r.Type, r.ID)
125125
}
126126

127-
// Service allows GRPC services to be registered with the underlying server
128-
type Service interface {
129-
Register(*grpc.Server) error
130-
}
131-
132-
// TTRPCService allows TTRPC services to be registered with the underlying server
133-
type TTRPCService interface {
134-
RegisterTTRPC(*ttrpc.Server) error
135-
}
136-
137-
// TCPService allows GRPC services to be registered with the underlying tcp server
138-
type TCPService interface {
139-
RegisterTCP(*grpc.Server) error
140-
}
141-
142127
var register = struct {
143128
sync.RWMutex
144129
r []*Registration

runtime/v2/shim/shim.go

Lines changed: 131 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/containerd/containerd/events"
3131
"github.com/containerd/containerd/log"
3232
"github.com/containerd/containerd/namespaces"
33+
"github.com/containerd/containerd/plugin"
3334
shimapi "github.com/containerd/containerd/runtime/v2/task"
3435
"github.com/containerd/containerd/version"
3536
"github.com/containerd/ttrpc"
@@ -38,13 +39,6 @@ import (
3839
"github.com/sirupsen/logrus"
3940
)
4041

41-
// Client for a shim server
42-
type Client struct {
43-
service shimapi.TaskService
44-
context context.Context
45-
signals chan os.Signal
46-
}
47-
4842
// Publisher for events
4943
type Publisher interface {
5044
events.Publisher
@@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
6458

6559
// Shim server interface
6660
type Shim interface {
67-
shimapi.TaskService
6861
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
6962
StartShim(ctx context.Context, opts StartOpts) (string, error)
7063
}
@@ -91,6 +84,19 @@ type Config struct {
9184
NoSetupLogger bool
9285
}
9386

87+
type ttrpcService interface {
88+
RegisterTTRPC(*ttrpc.Server) error
89+
}
90+
91+
type taskService struct {
92+
local shimapi.TaskService
93+
}
94+
95+
func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error {
96+
shimapi.RegisterTaskService(server, t.local)
97+
return nil
98+
}
99+
94100
var (
95101
debugFlag bool
96102
versionFlag bool
@@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) {
158164
for _, o := range opts {
159165
o(&config)
160166
}
167+
161168
if err := run(id, initFunc, config); err != nil {
162169
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
163170
os.Exit(1)
@@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error {
208215
return err
209216
}
210217

218+
// Handle explicit actions
211219
switch action {
212220
case "delete":
213221
logger := logrus.WithFields(logrus.Fields{
@@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error {
234242
Address: addressFlag,
235243
TTRPCAddress: ttrpcAddress,
236244
}
245+
237246
address, err := service.StartShim(ctx, opts)
238247
if err != nil {
239248
return err
@@ -242,64 +251,141 @@ func run(id string, initFunc Init, config Config) error {
242251
return err
243252
}
244253
return nil
245-
default:
246-
if !config.NoSetupLogger {
247-
if err := setLogger(ctx, idFlag); err != nil {
248-
return err
249-
}
254+
}
255+
256+
if !config.NoSetupLogger {
257+
if err := setLogger(ctx, idFlag); err != nil {
258+
return err
259+
}
260+
}
261+
262+
// Register event plugin
263+
plugin.Register(&plugin.Registration{
264+
Type: plugin.EventPlugin,
265+
ID: "publisher",
266+
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
267+
return publisher, nil
268+
},
269+
})
270+
271+
// If service is an implementation of the task service, register it as a plugin
272+
if ts, ok := service.(shimapi.TaskService); ok {
273+
plugin.Register(&plugin.Registration{
274+
Type: plugin.TTRPCPlugin,
275+
ID: "task",
276+
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
277+
return &taskService{ts}, nil
278+
},
279+
})
280+
}
281+
282+
var (
283+
initialized = plugin.NewPluginSet()
284+
ttrpcServices = []ttrpcService{}
285+
)
286+
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
287+
for _, p := range plugins {
288+
id := p.URI()
289+
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
290+
291+
initContext := plugin.NewContext(
292+
ctx,
293+
p,
294+
initialized,
295+
// NOTE: Root is empty since the shim does not support persistent storage,
296+
// shim plugins should make use state directory for writing files to disk.
297+
// The state directory will be destroyed when the shim if cleaned up or
298+
// on reboot
299+
"",
300+
bundlePath,
301+
)
302+
initContext.Address = addressFlag
303+
initContext.TTRPCAddress = ttrpcAddress
304+
305+
// load the plugin specific configuration if it is provided
306+
//TODO: Read configuration passed into shim, or from state directory?
307+
//if p.Config != nil {
308+
// pc, err := config.Decode(p)
309+
// if err != nil {
310+
// return nil, err
311+
// }
312+
// initContext.Config = pc
313+
//}
314+
315+
result := p.Init(initContext)
316+
if err := initialized.Add(result); err != nil {
317+
return errors.Wrapf(err, "could not add plugin result to plugin set")
250318
}
251-
client := NewShimClient(ctx, service, signals)
252-
if err := client.Serve(); err != nil {
253-
if err != context.Canceled {
254-
return err
319+
320+
instance, err := result.Instance()
321+
if err != nil {
322+
if plugin.IsSkipPlugin(err) {
323+
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
324+
} else {
325+
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
255326
}
327+
continue
256328
}
257329

258-
// NOTE: If the shim server is down(like oom killer), the address
259-
// socket might be leaking.
260-
if address, err := ReadAddress("address"); err == nil {
261-
_ = RemoveSocket(address)
330+
if src, ok := instance.(ttrpcService); ok {
331+
logrus.WithField("id", id).Debug("registering ttrpc service")
332+
ttrpcServices = append(ttrpcServices, src)
262333
}
334+
}
335+
336+
server, err := newServer()
337+
if err != nil {
338+
return errors.Wrap(err, "failed creating server")
339+
}
263340

264-
select {
265-
case <-publisher.Done():
266-
return nil
267-
case <-time.After(5 * time.Second):
268-
return errors.New("publisher not closed")
341+
for _, srv := range ttrpcServices {
342+
if err := srv.RegisterTTRPC(server); err != nil {
343+
return errors.Wrap(err, "failed to register service")
269344
}
270345
}
271-
}
272346

273-
// NewShimClient creates a new shim server client
274-
func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client {
275-
s := &Client{
276-
service: svc,
277-
context: ctx,
278-
signals: signals,
347+
if err := serve(ctx, server, signals); err != nil {
348+
if err != context.Canceled {
349+
return err
350+
}
351+
}
352+
353+
// NOTE: If the shim server is down(like oom killer), the address
354+
// socket might be leaking.
355+
if address, err := ReadAddress("address"); err == nil {
356+
_ = RemoveSocket(address)
357+
}
358+
359+
select {
360+
case <-publisher.Done():
361+
return nil
362+
case <-time.After(5 * time.Second):
363+
return errors.New("publisher not closed")
279364
}
280-
return s
281365
}
282366

283-
// Serve the shim server
284-
func (s *Client) Serve() error {
367+
// serve serves the ttrpc API over a unix socket in the current working directory
368+
// and blocks until the context is canceled
369+
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) error {
285370
dump := make(chan os.Signal, 32)
286371
setupDumpStacks(dump)
287372

288373
path, err := os.Getwd()
289374
if err != nil {
290375
return err
291376
}
292-
server, err := newServer()
293-
if err != nil {
294-
return errors.Wrap(err, "failed creating server")
295-
}
296377

297-
logrus.Debug("registering ttrpc server")
298-
shimapi.RegisterTaskService(server, s.service)
299-
300-
if err := serve(s.context, server, socketFlag); err != nil {
378+
l, err := serveListener(socketFlag)
379+
if err != nil {
301380
return err
302381
}
382+
go func() {
383+
defer l.Close()
384+
if err := server.Serve(ctx, l); err != nil &&
385+
!strings.Contains(err.Error(), "use of closed network connection") {
386+
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
387+
}
388+
}()
303389
logger := logrus.WithFields(logrus.Fields{
304390
"pid": os.Getpid(),
305391
"path": path,
@@ -310,24 +396,7 @@ func (s *Client) Serve() error {
310396
dumpStacks(logger)
311397
}
312398
}()
313-
return handleSignals(s.context, logger, s.signals)
314-
}
315-
316-
// serve serves the ttrpc API over a unix socket at the provided path
317-
// this function does not block
318-
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
319-
l, err := serveListener(path)
320-
if err != nil {
321-
return err
322-
}
323-
go func() {
324-
defer l.Close()
325-
if err := server.Serve(ctx, l); err != nil &&
326-
!strings.Contains(err.Error(), "use of closed network connection") {
327-
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
328-
}
329-
}()
330-
return nil
399+
return handleSignals(ctx, logger, signals)
331400
}
332401

333402
func dumpStacks(logger *logrus.Entry) {

services/server/server.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,29 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
142142

143143
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
144144
}
145+
146+
// grpcService allows GRPC services to be registered with the underlying server
147+
type grpcService interface {
148+
Register(*grpc.Server) error
149+
}
150+
151+
// tcpService allows GRPC services to be registered with the underlying tcp server
152+
type tcpService interface {
153+
RegisterTCP(*grpc.Server) error
154+
}
155+
156+
// ttrpcService allows TTRPC services to be registered with the underlying server
157+
type ttrpcService interface {
158+
RegisterTTRPC(*ttrpc.Server) error
159+
}
160+
145161
var (
146162
grpcServer = grpc.NewServer(serverOpts...)
147163
tcpServer = grpc.NewServer(tcpServerOpts...)
148164

149-
grpcServices []plugin.Service
150-
tcpServices []plugin.TCPService
151-
ttrpcServices []plugin.TTRPCService
165+
grpcServices []grpcService
166+
tcpServices []tcpService
167+
ttrpcServices []ttrpcService
152168

153169
s = &Server{
154170
grpcServer: grpcServer,
@@ -211,13 +227,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
211227

212228
delete(required, reqID)
213229
// check for grpc services that should be registered with the server
214-
if src, ok := instance.(plugin.Service); ok {
230+
if src, ok := instance.(grpcService); ok {
215231
grpcServices = append(grpcServices, src)
216232
}
217-
if src, ok := instance.(plugin.TTRPCService); ok {
233+
if src, ok := instance.(ttrpcService); ok {
218234
ttrpcServices = append(ttrpcServices, src)
219235
}
220-
if service, ok := instance.(plugin.TCPService); ok {
236+
if service, ok := instance.(tcpService); ok {
221237
tcpServices = append(tcpServices, service)
222238
}
223239

0 commit comments

Comments
 (0)