@@ -50,24 +50,22 @@ const (
5050 // Add a timeout for each event handling, events that timeout will be requeued and
5151 // handled again in the future.
5252 handleEventTimeout = 10 * time .Second
53-
54- exitChannelSize = 1024
5553)
5654
5755// eventMonitor monitors containerd event and updates internal state correspondingly.
58- // TODO(random-liu): Handle event for each container in a separate goroutine.
5956type eventMonitor struct {
60- c * criService
61- ch <- chan * events.Envelope
62- // exitCh receives container/sandbox exit events from exit monitors.
63- exitCh chan * eventtypes.TaskExit
57+ c * criService
58+ ch <- chan * events.Envelope
6459 errCh <- chan error
6560 ctx context.Context
6661 cancel context.CancelFunc
6762 backOff * backOff
6863}
6964
7065type backOff struct {
66+ // queuePoolMu is mutex used to protect the queuePool map
67+ queuePoolMu sync.Mutex
68+
7169 queuePool map [string ]* backOffQueue
7270 // tickerMu is mutex used to protect the ticker.
7371 tickerMu sync.Mutex
@@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
9391 c : c ,
9492 ctx : ctx ,
9593 cancel : cancel ,
96- exitCh : make (chan * eventtypes.TaskExit , exitChannelSize ),
9794 backOff : newBackOff (),
9895 }
9996}
@@ -109,26 +106,102 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
109106 em .ch , em .errCh = subscriber .Subscribe (em .ctx , filters ... )
110107}
111108
112- // startExitMonitor starts an exit monitor for a given container/ sandbox.
113- func (em * eventMonitor ) startExitMonitor (ctx context.Context , id string , pid uint32 , exitCh <- chan containerd.ExitStatus ) <- chan struct {} {
109+ // startSandboxExitMonitor starts an exit monitor for a given sandbox.
110+ func (em * eventMonitor ) startSandboxExitMonitor (ctx context.Context , id string , pid uint32 , exitCh <- chan containerd.ExitStatus ) <- chan struct {} {
114111 stopCh := make (chan struct {})
115112 go func () {
116113 defer close (stopCh )
117114 select {
118115 case exitRes := <- exitCh :
119116 exitStatus , exitedAt , err := exitRes .Result ()
120117 if err != nil {
121- logrus .WithError (err ).Errorf ("Failed to get task exit status for %q" , id )
118+ logrus .WithError (err ).Errorf ("failed to get task exit status for %q" , id )
122119 exitStatus = unknownExitCode
123120 exitedAt = time .Now ()
124121 }
125- em .exitCh <- & eventtypes.TaskExit {
122+
123+ e := & eventtypes.TaskExit {
126124 ContainerID : id ,
127125 ID : id ,
128126 Pid : pid ,
129127 ExitStatus : exitStatus ,
130128 ExitedAt : exitedAt ,
131129 }
130+
131+ logrus .Debugf ("received exit event %+v" , e )
132+
133+ err = func () error {
134+ dctx := ctrdutil .NamespacedContext ()
135+ dctx , dcancel := context .WithTimeout (dctx , handleEventTimeout )
136+ defer dcancel ()
137+
138+ sb , err := em .c .sandboxStore .Get (e .ID )
139+ if err == nil {
140+ if err := handleSandboxExit (dctx , e , sb ); err != nil {
141+ return err
142+ }
143+ return nil
144+ } else if err != store .ErrNotExist {
145+ return errors .Wrapf (err , "failed to get sandbox %s" , e .ID )
146+ }
147+ return nil
148+ }()
149+ if err != nil {
150+ logrus .WithError (err ).Errorf ("failed to handle sandbox TaskExit event %+v" , e )
151+ em .backOff .enBackOff (id , e )
152+ }
153+ return
154+ case <- ctx .Done ():
155+ }
156+ }()
157+ return stopCh
158+ }
159+
160+ // startContainerExitMonitor starts an exit monitor for a given container.
161+ func (em * eventMonitor ) startContainerExitMonitor (ctx context.Context , id string , pid uint32 , exitCh <- chan containerd.ExitStatus ) <- chan struct {} {
162+ stopCh := make (chan struct {})
163+ go func () {
164+ defer close (stopCh )
165+ select {
166+ case exitRes := <- exitCh :
167+ exitStatus , exitedAt , err := exitRes .Result ()
168+ if err != nil {
169+ logrus .WithError (err ).Errorf ("failed to get task exit status for %q" , id )
170+ exitStatus = unknownExitCode
171+ exitedAt = time .Now ()
172+ }
173+
174+ e := & eventtypes.TaskExit {
175+ ContainerID : id ,
176+ ID : id ,
177+ Pid : pid ,
178+ ExitStatus : exitStatus ,
179+ ExitedAt : exitedAt ,
180+ }
181+
182+ logrus .Debugf ("received exit event %+v" , e )
183+
184+ err = func () error {
185+ dctx := ctrdutil .NamespacedContext ()
186+ dctx , dcancel := context .WithTimeout (dctx , handleEventTimeout )
187+ defer dcancel ()
188+
189+ cntr , err := em .c .containerStore .Get (e .ID )
190+ if err == nil {
191+ if err := handleContainerExit (dctx , e , cntr ); err != nil {
192+ return err
193+ }
194+ return nil
195+ } else if err != store .ErrNotExist {
196+ return errors .Wrapf (err , "failed to get container %s" , e .ID )
197+ }
198+ return nil
199+ }()
200+ if err != nil {
201+ logrus .WithError (err ).Errorf ("failed to handle container TaskExit event %+v" , e )
202+ em .backOff .enBackOff (id , e )
203+ }
204+ return
132205 case <- ctx .Done ():
133206 }
134207 }()
@@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
157230 return id , evt , nil
158231}
159232
160- // start starts the event monitor which monitors and handles all subscribed events. It returns
161- // an error channel for the caller to wait for stop errors from the event monitor.
162- // start must be called after subscribe.
233+ // start starts the event monitor which monitors and handles all subscribed events.
234+ // It returns an error channel for the caller to wait for stop errors from the
235+ // event monitor.
236+ //
237+ // NOTE:
238+ // 1. start must be called after subscribe.
239+ // 2. The task exit event has been handled in individual startSandboxExitMonitor
240+ // or startContainerExitMonitor goroutine at the first. If the goroutine fails,
241+ // it puts the event into backoff retry queue and event monitor will handle
242+ // it later.
163243func (em * eventMonitor ) start () <- chan error {
164244 errCh := make (chan error )
165245 if em .ch == nil || em .errCh == nil {
@@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
170250 defer close (errCh )
171251 for {
172252 select {
173- case e := <- em .exitCh :
174- logrus .Debugf ("Received exit event %+v" , e )
175- id := e .ID
176- if em .backOff .isInBackOff (id ) {
177- logrus .Infof ("Events for %q is in backoff, enqueue event %+v" , id , e )
178- em .backOff .enBackOff (id , e )
179- break
180- }
181- if err := em .handleEvent (e ); err != nil {
182- logrus .WithError (err ).Errorf ("Failed to handle exit event %+v for %s" , e , id )
183- em .backOff .enBackOff (id , e )
184- }
185253 case e := <- em .ch :
186254 logrus .Debugf ("Received containerd event timestamp - %v, namespace - %q, topic - %q" , e .Timestamp , e .Namespace , e .Topic )
187255 if e .Namespace != constants .K8sContainerdNamespace {
@@ -388,6 +456,9 @@ func newBackOff() *backOff {
388456}
389457
390458func (b * backOff ) getExpiredIDs () []string {
459+ b .queuePoolMu .Lock ()
460+ defer b .queuePoolMu .Unlock ()
461+
391462 var ids []string
392463 for id , q := range b .queuePool {
393464 if q .isExpire () {
@@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
398469}
399470
400471func (b * backOff ) isInBackOff (key string ) bool {
472+ b .queuePoolMu .Lock ()
473+ defer b .queuePoolMu .Unlock ()
474+
401475 if _ , ok := b .queuePool [key ]; ok {
402476 return true
403477 }
@@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {
406480
407481// enBackOff start to backOff and put event to the tail of queue
408482func (b * backOff ) enBackOff (key string , evt interface {}) {
483+ b .queuePoolMu .Lock ()
484+ defer b .queuePoolMu .Unlock ()
485+
409486 if queue , ok := b .queuePool [key ]; ok {
410487 queue .events = append (queue .events , evt )
411488 return
@@ -415,13 +492,19 @@ func (b *backOff) enBackOff(key string, evt interface{}) {
415492
416493// enBackOff get out the whole queue
417494func (b * backOff ) deBackOff (key string ) * backOffQueue {
495+ b .queuePoolMu .Lock ()
496+ defer b .queuePoolMu .Unlock ()
497+
418498 queue := b .queuePool [key ]
419499 delete (b .queuePool , key )
420500 return queue
421501}
422502
423503// enBackOff start to backOff again and put events to the queue
424504func (b * backOff ) reBackOff (key string , events []interface {}, oldDuration time.Duration ) {
505+ b .queuePoolMu .Lock ()
506+ defer b .queuePoolMu .Unlock ()
507+
425508 duration := 2 * oldDuration
426509 if duration > b .maxDuration {
427510 duration = b .maxDuration
0 commit comments