Skip to content

Commit e56de63

Browse files
committed
cri: handle sandbox/container exit event separately
The event monitor handles exit events one by one. If there is something wrong about deleting task, it will slow down the terminating Pods. In order to reduce the impact, the exit event watcher should handle exit event separately. If it failed, the watcher should put it into backoff queue and retry it. Signed-off-by: Wei Fu <[email protected]>
1 parent 643bb9b commit e56de63

6 files changed

Lines changed: 117 additions & 36 deletions

File tree

pkg/cri/server/container_start.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
148148
return nil, errors.Wrapf(err, "failed to update container %q state", id)
149149
}
150150

151-
// start the monitor after updating container state, this ensures that
152-
// event monitor receives the TaskExit event and update container state
153-
// after this.
154-
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
151+
// It handles the TaskExit event and update container state after this.
152+
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
155153

156154
return &runtime.StartContainerResponse{}, nil
157155
}

pkg/cri/server/container_stop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
8888
}
8989

9090
exitCtx, exitCancel := context.WithCancel(context.Background())
91-
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
91+
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
9292
defer func() {
9393
exitCancel()
9494
// This ensures that exit monitor is stopped before

pkg/cri/server/events.go

Lines changed: 110 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,22 @@ const (
5050
// Add a timeout for each event handling, events that timeout will be requeued and
5151
// handled again in the future.
5252
handleEventTimeout = 10 * time.Second
53-
54-
exitChannelSize = 1024
5553
)
5654

5755
// eventMonitor monitors containerd event and updates internal state correspondingly.
58-
// TODO(random-liu): Handle event for each container in a separate goroutine.
5956
type eventMonitor struct {
60-
c *criService
61-
ch <-chan *events.Envelope
62-
// exitCh receives container/sandbox exit events from exit monitors.
63-
exitCh chan *eventtypes.TaskExit
57+
c *criService
58+
ch <-chan *events.Envelope
6459
errCh <-chan error
6560
ctx context.Context
6661
cancel context.CancelFunc
6762
backOff *backOff
6863
}
6964

7065
type backOff struct {
66+
// queuePoolMu is mutex used to protect the queuePool map
67+
queuePoolMu sync.Mutex
68+
7169
queuePool map[string]*backOffQueue
7270
// tickerMu is mutex used to protect the ticker.
7371
tickerMu sync.Mutex
@@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
9391
c: c,
9492
ctx: ctx,
9593
cancel: cancel,
96-
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
9794
backOff: newBackOff(),
9895
}
9996
}
@@ -109,26 +106,102 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
109106
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
110107
}
111108

112-
// startExitMonitor starts an exit monitor for a given container/sandbox.
113-
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
109+
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
110+
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
114111
stopCh := make(chan struct{})
115112
go func() {
116113
defer close(stopCh)
117114
select {
118115
case exitRes := <-exitCh:
119116
exitStatus, exitedAt, err := exitRes.Result()
120117
if err != nil {
121-
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
118+
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
122119
exitStatus = unknownExitCode
123120
exitedAt = time.Now()
124121
}
125-
em.exitCh <- &eventtypes.TaskExit{
122+
123+
e := &eventtypes.TaskExit{
126124
ContainerID: id,
127125
ID: id,
128126
Pid: pid,
129127
ExitStatus: exitStatus,
130128
ExitedAt: exitedAt,
131129
}
130+
131+
logrus.Debugf("received exit event %+v", e)
132+
133+
err = func() error {
134+
dctx := ctrdutil.NamespacedContext()
135+
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
136+
defer dcancel()
137+
138+
sb, err := em.c.sandboxStore.Get(e.ID)
139+
if err == nil {
140+
if err := handleSandboxExit(dctx, e, sb); err != nil {
141+
return err
142+
}
143+
return nil
144+
} else if err != store.ErrNotExist {
145+
return errors.Wrapf(err, "failed to get sandbox %s", e.ID)
146+
}
147+
return nil
148+
}()
149+
if err != nil {
150+
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
151+
em.backOff.enBackOff(id, e)
152+
}
153+
return
154+
case <-ctx.Done():
155+
}
156+
}()
157+
return stopCh
158+
}
159+
160+
// startContainerExitMonitor starts an exit monitor for a given container.
161+
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
162+
stopCh := make(chan struct{})
163+
go func() {
164+
defer close(stopCh)
165+
select {
166+
case exitRes := <-exitCh:
167+
exitStatus, exitedAt, err := exitRes.Result()
168+
if err != nil {
169+
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
170+
exitStatus = unknownExitCode
171+
exitedAt = time.Now()
172+
}
173+
174+
e := &eventtypes.TaskExit{
175+
ContainerID: id,
176+
ID: id,
177+
Pid: pid,
178+
ExitStatus: exitStatus,
179+
ExitedAt: exitedAt,
180+
}
181+
182+
logrus.Debugf("received exit event %+v", e)
183+
184+
err = func() error {
185+
dctx := ctrdutil.NamespacedContext()
186+
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
187+
defer dcancel()
188+
189+
cntr, err := em.c.containerStore.Get(e.ID)
190+
if err == nil {
191+
if err := handleContainerExit(dctx, e, cntr); err != nil {
192+
return err
193+
}
194+
return nil
195+
} else if err != store.ErrNotExist {
196+
return errors.Wrapf(err, "failed to get container %s", e.ID)
197+
}
198+
return nil
199+
}()
200+
if err != nil {
201+
logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
202+
em.backOff.enBackOff(id, e)
203+
}
204+
return
132205
case <-ctx.Done():
133206
}
134207
}()
@@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
157230
return id, evt, nil
158231
}
159232

160-
// start starts the event monitor which monitors and handles all subscribed events. It returns
161-
// an error channel for the caller to wait for stop errors from the event monitor.
162-
// start must be called after subscribe.
233+
// start starts the event monitor which monitors and handles all subscribed events.
234+
// It returns an error channel for the caller to wait for stop errors from the
235+
// event monitor.
236+
//
237+
// NOTE:
238+
// 1. start must be called after subscribe.
239+
// 2. The task exit event has been handled in individual startSandboxExitMonitor
240+
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
241+
// it puts the event into backoff retry queue and event monitor will handle
242+
// it later.
163243
func (em *eventMonitor) start() <-chan error {
164244
errCh := make(chan error)
165245
if em.ch == nil || em.errCh == nil {
@@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
170250
defer close(errCh)
171251
for {
172252
select {
173-
case e := <-em.exitCh:
174-
logrus.Debugf("Received exit event %+v", e)
175-
id := e.ID
176-
if em.backOff.isInBackOff(id) {
177-
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
178-
em.backOff.enBackOff(id, e)
179-
break
180-
}
181-
if err := em.handleEvent(e); err != nil {
182-
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
183-
em.backOff.enBackOff(id, e)
184-
}
185253
case e := <-em.ch:
186254
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
187255
if e.Namespace != constants.K8sContainerdNamespace {
@@ -388,6 +456,9 @@ func newBackOff() *backOff {
388456
}
389457

390458
func (b *backOff) getExpiredIDs() []string {
459+
b.queuePoolMu.Lock()
460+
defer b.queuePoolMu.Unlock()
461+
391462
var ids []string
392463
for id, q := range b.queuePool {
393464
if q.isExpire() {
@@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
398469
}
399470

400471
func (b *backOff) isInBackOff(key string) bool {
472+
b.queuePoolMu.Lock()
473+
defer b.queuePoolMu.Unlock()
474+
401475
if _, ok := b.queuePool[key]; ok {
402476
return true
403477
}
@@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {
406480

407481
// enBackOff start to backOff and put event to the tail of queue
408482
func (b *backOff) enBackOff(key string, evt interface{}) {
483+
b.queuePoolMu.Lock()
484+
defer b.queuePoolMu.Unlock()
485+
409486
if queue, ok := b.queuePool[key]; ok {
410487
queue.events = append(queue.events, evt)
411488
return
@@ -415,13 +492,19 @@ func (b *backOff) enBackOff(key string, evt interface{}) {
415492

416493
// enBackOff get out the whole queue
417494
func (b *backOff) deBackOff(key string) *backOffQueue {
495+
b.queuePoolMu.Lock()
496+
defer b.queuePoolMu.Unlock()
497+
418498
queue := b.queuePool[key]
419499
delete(b.queuePool, key)
420500
return queue
421501
}
422502

423503
// enBackOff start to backOff again and put events to the queue
424504
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
505+
b.queuePoolMu.Lock()
506+
defer b.queuePoolMu.Unlock()
507+
425508
duration := 2 * oldDuration
426509
if duration > b.maxDuration {
427510
duration = b.maxDuration

pkg/cri/server/restart.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
290290
status.Reason = unknownExitReason
291291
} else {
292292
// Start exit monitor.
293-
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh)
293+
c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
294294
}
295295
case containerd.Stopped:
296296
// Task is stopped. Updata status and delete the task.
@@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
389389
// Task is running, set sandbox state as READY.
390390
status.State = sandboxstore.StateReady
391391
status.Pid = t.Pid()
392-
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
392+
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
393393
}
394394
} else {
395395
// Task is not running. Delete the task and set sandbox state as NOTREADY.

pkg/cri/server/sandbox_run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
331331
//
332332
// TaskOOM from containerd may come before sandbox is added to store,
333333
// but we don't care about sandbox TaskOOM right now, so it is fine.
334-
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
334+
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
335335

336336
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
337337
}

pkg/cri/server/sandbox_stop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
134134
}
135135

136136
exitCtx, exitCancel := context.WithCancel(context.Background())
137-
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
137+
stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
138138
defer func() {
139139
exitCancel()
140140
// This ensures that exit monitor is stopped before

0 commit comments

Comments
 (0)