Skip to content

Commit bccaf68

Browse files
committed
notify readiness when registered plugins are ready
Signed-off-by: Henry Wang <[email protected]> (cherry picked from commit 4bfcac8)
1 parent 6737097 commit bccaf68

5 files changed

Lines changed: 39 additions & 14 deletions

File tree

cmd/containerd/command/main.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,21 @@ can be used and modified as necessary as a custom configuration.`
271271
}
272272
serve(ctx, l, server.ServeGRPC)
273273

274-
if err := notifyReady(ctx); err != nil {
275-
log.G(ctx).WithError(err).Warn("notify ready failed")
276-
}
274+
readyC := make(chan struct{})
275+
go func() {
276+
server.Wait()
277+
close(readyC)
278+
}()
277279

278-
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
279-
<-done
280+
select {
281+
case <-readyC:
282+
if err := notifyReady(ctx); err != nil {
283+
log.G(ctx).WithError(err).Warn("notify ready failed")
284+
}
285+
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
286+
<-done
287+
case <-done:
288+
}
280289
return nil
281290
}
282291
return app

pkg/cri/cri.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func init() {
6060
}
6161

6262
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
63+
ready := ic.RegisterReadiness()
6364
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
6465
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion, "CRIVersionAlpha": constants.CRIVersionAlpha}
6566
ctx := ic.Context
@@ -103,7 +104,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
103104
}
104105

105106
go func() {
106-
if err := s.Run(); err != nil {
107+
if err := s.Run(ready); err != nil {
107108
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
108109
}
109110
// TODO(random-liu): Whether and how we can stop containerd.

pkg/cri/server/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type grpcAlphaServices interface {
6666

6767
// CRIService is the interface implement CRI remote service server.
6868
type CRIService interface {
69-
Run() error
69+
Run(ready func()) error
7070
// io.Closer is used by containerd to gracefully stop cri service.
7171
io.Closer
7272
Register(*grpc.Server) error
@@ -200,7 +200,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
200200
}
201201

202202
// Run starts the CRI service.
203-
func (c *criService) Run() error {
203+
func (c *criService) Run(ready func()) error {
204204
logrus.Info("Start subscribing containerd event")
205205
c.eventMonitor.subscribe(c.client)
206206

@@ -251,6 +251,7 @@ func (c *criService) Run() error {
251251

252252
// Set the server as initialized. GRPC services could start serving traffic.
253253
c.initialized.Set()
254+
ready()
254255

255256
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
256257
// Stop the whole CRI service if any of the critical service exits.

plugin/context.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828

2929
// InitContext is used for plugin initialization
3030
type InitContext struct {
31-
Context context.Context
32-
Root string
33-
State string
34-
Config interface{}
35-
Address string
36-
TTRPCAddress string
31+
Context context.Context
32+
Root string
33+
State string
34+
Config interface{}
35+
Address string
36+
TTRPCAddress string
37+
RegisterReadiness func() func()
3738

3839
// deprecated: will be removed in 2.0, use plugin.EventType
3940
Events *exchange.Exchange

services/server/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
231231
initContext.Events = events
232232
initContext.Address = config.GRPC.Address
233233
initContext.TTRPCAddress = config.TTRPC.Address
234+
initContext.RegisterReadiness = s.RegisterReadiness
234235

235236
// load the plugin specific configuration if it is provided
236237
if p.Config != nil {
@@ -306,6 +307,7 @@ type Server struct {
306307
tcpServer *grpc.Server
307308
config *srvconfig.Config
308309
plugins []*plugin.Plugin
310+
ready sync.WaitGroup
309311
}
310312

311313
// ServeGRPC provides the containerd grpc APIs on the provided listener
@@ -383,6 +385,17 @@ func (s *Server) Stop() {
383385
}
384386
}
385387

388+
func (s *Server) RegisterReadiness() func() {
389+
s.ready.Add(1)
390+
return func() {
391+
s.ready.Done()
392+
}
393+
}
394+
395+
func (s *Server) Wait() {
396+
s.ready.Wait()
397+
}
398+
386399
// LoadPlugins loads all plugins into containerd and generates an ordered graph
387400
// of all plugins.
388401
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {

0 commit comments

Comments
 (0)