Skip to content

Commit 2c38cad

Browse files
committed
notify readiness when registered plugins are ready
Signed-off-by: Henry Wang <[email protected]> (cherry picked from commit 4bfcac8)
1 parent 835383b commit 2c38cad

6 files changed

Lines changed: 42 additions & 16 deletions

File tree

cmd/containerd/command/main.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,21 @@ can be used and modified as necessary as a custom configuration.`
271271
}
272272
serve(ctx, l, server.ServeGRPC)
273273

274-
if err := notifyReady(ctx); err != nil {
275-
log.G(ctx).WithError(err).Warn("notify ready failed")
276-
}
274+
readyC := make(chan struct{})
275+
go func() {
276+
server.Wait()
277+
close(readyC)
278+
}()
277279

278-
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
279-
<-done
280+
select {
281+
case <-readyC:
282+
if err := notifyReady(ctx); err != nil {
283+
log.G(ctx).WithError(err).Warn("notify ready failed")
284+
}
285+
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
286+
<-done
287+
case <-done:
288+
}
280289
return nil
281290
}
282291
return app

pkg/cri/cri.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func init() {
5555
}
5656

5757
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
58+
ready := ic.RegisterReadiness()
5859
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
5960
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion, "CRIVersionAlpha": constants.CRIVersionAlpha}
6061
ctx := ic.Context
@@ -100,7 +101,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
100101
}
101102

102103
go func() {
103-
if err := s.Run(); err != nil {
104+
if err := s.Run(ready); err != nil {
104105
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
105106
}
106107
// TODO(random-liu): Whether and how we can stop containerd.

pkg/cri/sbserver/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type CRIService interface {
6565
// Closer is used by containerd to gracefully stop cri service.
6666
io.Closer
6767

68-
Run() error
68+
Run(ready func()) error
6969

7070
Register(*grpc.Server) error
7171
}
@@ -227,7 +227,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
227227
}
228228

229229
// Run starts the CRI service.
230-
func (c *criService) Run() error {
230+
func (c *criService) Run(ready func()) error {
231231
logrus.Info("Start subscribing containerd event")
232232
c.eventMonitor.subscribe(c.client)
233233

@@ -290,6 +290,7 @@ func (c *criService) Run() error {
290290

291291
// Set the server as initialized. GRPC services could start serving traffic.
292292
c.initialized.Set()
293+
ready()
293294

294295
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
295296
// Stop the whole CRI service if any of the critical service exits.

pkg/cri/server/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type CRIService interface {
6262
// Closer is used by containerd to gracefully stop cri service.
6363
io.Closer
6464

65-
Run() error
65+
Run(ready func()) error
6666

6767
Register(*grpc.Server) error
6868
}
@@ -204,7 +204,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
204204
}
205205

206206
// Run starts the CRI service.
207-
func (c *criService) Run() error {
207+
func (c *criService) Run(ready func()) error {
208208
logrus.Info("Start subscribing containerd event")
209209
c.eventMonitor.subscribe(c.client)
210210

@@ -267,6 +267,7 @@ func (c *criService) Run() error {
267267

268268
// Set the server as initialized. GRPC services could start serving traffic.
269269
c.initialized.Set()
270+
ready()
270271

271272
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
272273
// Stop the whole CRI service if any of the critical service exits.

plugin/context.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828

2929
// InitContext is used for plugin initialization
3030
type InitContext struct {
31-
Context context.Context
32-
Root string
33-
State string
34-
Config interface{}
35-
Address string
36-
TTRPCAddress string
31+
Context context.Context
32+
Root string
33+
State string
34+
Config interface{}
35+
Address string
36+
TTRPCAddress string
37+
RegisterReadiness func() func()
3738

3839
// deprecated: will be removed in 2.0, use plugin.EventType
3940
Events *exchange.Exchange

services/server/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
218218
initContext.Events = events
219219
initContext.Address = config.GRPC.Address
220220
initContext.TTRPCAddress = config.TTRPC.Address
221+
initContext.RegisterReadiness = s.RegisterReadiness
221222

222223
// load the plugin specific configuration if it is provided
223224
if p.Config != nil {
@@ -293,6 +294,7 @@ type Server struct {
293294
tcpServer *grpc.Server
294295
config *srvconfig.Config
295296
plugins []*plugin.Plugin
297+
ready sync.WaitGroup
296298
}
297299

298300
// ServeGRPC provides the containerd grpc APIs on the provided listener
@@ -370,6 +372,17 @@ func (s *Server) Stop() {
370372
}
371373
}
372374

375+
func (s *Server) RegisterReadiness() func() {
376+
s.ready.Add(1)
377+
return func() {
378+
s.ready.Done()
379+
}
380+
}
381+
382+
func (s *Server) Wait() {
383+
s.ready.Wait()
384+
}
385+
373386
// LoadPlugins loads all plugins into containerd and generates an ordered graph
374387
// of all plugins.
375388
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {

0 commit comments

Comments
 (0)