@@ -26,19 +26,26 @@ import (
26
26
"os"
27
27
"path/filepath"
28
28
"strings"
29
+ "sync"
30
+ "time"
29
31
30
32
"github.com/boltdb/bolt"
33
+ csapi "github.com/containerd/containerd/api/services/content/v1"
34
+ ssapi "github.com/containerd/containerd/api/services/snapshots/v1"
31
35
"github.com/containerd/containerd/content"
32
36
"github.com/containerd/containerd/content/local"
37
+ csproxy "github.com/containerd/containerd/content/proxy"
38
+ "github.com/containerd/containerd/defaults"
33
39
"github.com/containerd/containerd/events/exchange"
34
40
"github.com/containerd/containerd/log"
35
41
"github.com/containerd/containerd/metadata"
42
+ "github.com/containerd/containerd/pkg/dialer"
36
43
"github.com/containerd/containerd/plugin"
37
44
"github.com/containerd/containerd/snapshots"
45
+ ssproxy "github.com/containerd/containerd/snapshots/proxy"
38
46
metrics "github.com/docker/go-metrics"
39
47
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
40
48
"github.com/pkg/errors"
41
-
42
49
"google.golang.org/grpc"
43
50
)
44
51
@@ -62,7 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
62
69
if err := apply (ctx , config ); err != nil {
63
70
return nil , err
64
71
}
65
- plugins , err := LoadPlugins (config )
72
+ plugins , err := LoadPlugins (ctx , config )
66
73
if err != nil {
67
74
return nil , err
68
75
}
@@ -204,7 +211,7 @@ func (s *Server) Stop() {
204
211
205
212
// LoadPlugins loads all plugins into containerd and generates an ordered graph
206
213
// of all plugins.
207
- func LoadPlugins (config * Config ) ([]* plugin.Registration , error ) {
214
+ func LoadPlugins (ctx context. Context , config * Config ) ([]* plugin.Registration , error ) {
208
215
// load all plugins into containerd
209
216
if err := plugin .Load (filepath .Join (config .Root , "plugins" )); err != nil {
210
217
return nil , err
@@ -265,10 +272,85 @@ func LoadPlugins(config *Config) ([]*plugin.Registration, error) {
265
272
},
266
273
})
267
274
275
+ clients := & proxyClients {}
276
+ for name , pp := range config .ProxyPlugins {
277
+ var (
278
+ t plugin.Type
279
+ f func (* grpc.ClientConn ) interface {}
280
+
281
+ address = pp .Address
282
+ )
283
+
284
+ switch pp .Type {
285
+ case string (plugin .SnapshotPlugin ), "snapshot" :
286
+ t = plugin .SnapshotPlugin
287
+ ssname := name
288
+ f = func (conn * grpc.ClientConn ) interface {} {
289
+ return ssproxy .NewSnapshotter (ssapi .NewSnapshotsClient (conn ), ssname )
290
+ }
291
+
292
+ case string (plugin .ContentPlugin ), "content" :
293
+ t = plugin .ContentPlugin
294
+ f = func (conn * grpc.ClientConn ) interface {} {
295
+ return csproxy .NewContentStore (csapi .NewContentClient (conn ))
296
+ }
297
+ default :
298
+ log .G (ctx ).WithField ("type" , pp .Type ).Warn ("unknown proxy plugin type" )
299
+ }
300
+
301
+ plugin .Register (& plugin.Registration {
302
+ Type : t ,
303
+ ID : name ,
304
+ InitFn : func (ic * plugin.InitContext ) (interface {}, error ) {
305
+ ic .Meta .Exports ["address" ] = address
306
+ conn , err := clients .getClient (address )
307
+ if err != nil {
308
+ return nil , err
309
+ }
310
+ return f (conn ), nil
311
+ },
312
+ })
313
+
314
+ }
315
+
268
316
// return the ordered graph for plugins
269
317
return plugin .Graph (config .DisabledPlugins ), nil
270
318
}
271
319
320
+ type proxyClients struct {
321
+ m sync.Mutex
322
+ clients map [string ]* grpc.ClientConn
323
+ }
324
+
325
+ func (pc * proxyClients ) getClient (address string ) (* grpc.ClientConn , error ) {
326
+ pc .m .Lock ()
327
+ defer pc .m .Unlock ()
328
+ if pc .clients == nil {
329
+ pc .clients = map [string ]* grpc.ClientConn {}
330
+ } else if c , ok := pc .clients [address ]; ok {
331
+ return c , nil
332
+ }
333
+
334
+ gopts := []grpc.DialOption {
335
+ grpc .WithInsecure (),
336
+ grpc .WithBackoffMaxDelay (3 * time .Second ),
337
+ grpc .WithDialer (dialer .Dialer ),
338
+
339
+ // TODO(stevvooe): We may need to allow configuration of this on the client.
340
+ grpc .WithDefaultCallOptions (grpc .MaxCallRecvMsgSize (defaults .DefaultMaxRecvMsgSize )),
341
+ grpc .WithDefaultCallOptions (grpc .MaxCallSendMsgSize (defaults .DefaultMaxSendMsgSize )),
342
+ }
343
+
344
+ conn , err := grpc .Dial (dialer .DialAddress (address ), gopts ... )
345
+ if err != nil {
346
+ return nil , errors .Wrapf (err , "failed to dial %q" , address )
347
+ }
348
+
349
+ pc .clients [address ] = conn
350
+
351
+ return conn , nil
352
+ }
353
+
272
354
func trapClosedConnErr (err error ) error {
273
355
if err == nil {
274
356
return nil
0 commit comments