Skip to content

Commit 4bfcac8

Browse files
committed
notify readiness when registered plugins are ready
Signed-off-by: Henry Wang <[email protected]>
1 parent ed7c0eb commit 4bfcac8

File tree

6 files changed

+42
-16
lines changed

6 files changed

+42
-16
lines changed

cmd/containerd/command/main.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,21 @@ can be used and modified as necessary as a custom configuration.`
270270
}
271271
serve(ctx, l, server.ServeGRPC)
272272

273-
if err := notifyReady(ctx); err != nil {
274-
log.G(ctx).WithError(err).Warn("notify ready failed")
275-
}
273+
readyC := make(chan struct{})
274+
go func() {
275+
server.Wait()
276+
close(readyC)
277+
}()
276278

277-
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
278-
<-done
279+
select {
280+
case <-readyC:
281+
if err := notifyReady(ctx); err != nil {
282+
log.G(ctx).WithError(err).Warn("notify ready failed")
283+
}
284+
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
285+
<-done
286+
case <-done:
287+
}
279288
return nil
280289
}
281290
return app

pkg/cri/cri.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func init() {
5454
}
5555

5656
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
57+
ready := ic.RegisterReadiness()
5758
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
5859
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
5960
ctx := ic.Context
@@ -99,7 +100,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
99100
}
100101

101102
go func() {
102-
if err := s.Run(); err != nil {
103+
if err := s.Run(ready); err != nil {
103104
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
104105
}
105106
// TODO(random-liu): Whether and how we can stop containerd.

pkg/cri/sbserver/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type CRIService interface {
6363
// Closer is used by containerd to gracefully stop cri service.
6464
io.Closer
6565

66-
Run() error
66+
Run(ready func()) error
6767

6868
Register(*grpc.Server) error
6969
}
@@ -237,7 +237,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
237237
}
238238

239239
// Run starts the CRI service.
240-
func (c *criService) Run() error {
240+
func (c *criService) Run(ready func()) error {
241241
log.L.Info("Start subscribing containerd event")
242242
c.eventMonitor.subscribe(c.client)
243243

@@ -291,6 +291,7 @@ func (c *criService) Run() error {
291291

292292
// Set the server as initialized. GRPC services could start serving traffic.
293293
c.initialized.Store(true)
294+
ready()
294295

295296
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
296297
// Stop the whole CRI service if any of the critical service exits.

pkg/cri/server/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type CRIService interface {
6262
// Closer is used by containerd to gracefully stop cri service.
6363
io.Closer
6464

65-
Run() error
65+
Run(ready func()) error
6666

6767
Register(*grpc.Server) error
6868
}
@@ -203,7 +203,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
203203
}
204204

205205
// Run starts the CRI service.
206-
func (c *criService) Run() error {
206+
func (c *criService) Run(ready func()) error {
207207
log.L.Info("Start subscribing containerd event")
208208
c.eventMonitor.subscribe(c.client)
209209

@@ -266,6 +266,7 @@ func (c *criService) Run() error {
266266

267267
// Set the server as initialized. GRPC services could start serving traffic.
268268
c.initialized.Store(true)
269+
ready()
269270

270271
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
271272
// Stop the whole CRI service if any of the critical service exits.

plugin/context.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828

2929
// InitContext is used for plugin initialization
3030
type InitContext struct {
31-
Context context.Context
32-
Root string
33-
State string
34-
Config interface{}
35-
Address string
36-
TTRPCAddress string
31+
Context context.Context
32+
Root string
33+
State string
34+
Config interface{}
35+
Address string
36+
TTRPCAddress string
37+
RegisterReadiness func() func()
3738

3839
// deprecated: will be removed in 2.0, use plugin.EventType
3940
Events *exchange.Exchange

services/server/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
218218
initContext.Events = events
219219
initContext.Address = config.GRPC.Address
220220
initContext.TTRPCAddress = config.TTRPC.Address
221+
initContext.RegisterReadiness = s.RegisterReadiness
221222

222223
// load the plugin specific configuration if it is provided
223224
if p.Config != nil {
@@ -293,6 +294,7 @@ type Server struct {
293294
tcpServer *grpc.Server
294295
config *srvconfig.Config
295296
plugins []*plugin.Plugin
297+
ready sync.WaitGroup
296298
}
297299

298300
// ServeGRPC provides the containerd grpc APIs on the provided listener
@@ -370,6 +372,17 @@ func (s *Server) Stop() {
370372
}
371373
}
372374

375+
func (s *Server) RegisterReadiness() func() {
376+
s.ready.Add(1)
377+
return func() {
378+
s.ready.Done()
379+
}
380+
}
381+
382+
func (s *Server) Wait() {
383+
s.ready.Wait()
384+
}
385+
373386
// LoadPlugins loads all plugins into containerd and generates an ordered graph
374387
// of all plugins.
375388
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {

0 commit comments

Comments
 (0)