Skip to content

Commit f615c58

Browse files
authored
Merge pull request #4682 from fuweid/cri-handle-exit-event-separate
cri: handle sandbox/container exit event in parallel
2 parents 3299c55 + e56de63 commit f615c58

6 files changed

Lines changed: 117 additions & 36 deletions

File tree

pkg/cri/server/container_start.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
148148
return nil, errors.Wrapf(err, "failed to update container %q state", id)
149149
}
150150

151-
// start the monitor after updating container state, this ensures that
152-
// event monitor receives the TaskExit event and update container state
153-
// after this.
154-
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
151+
// It handles the TaskExit event and update container state after this.
152+
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
155153

156154
return &runtime.StartContainerResponse{}, nil
157155
}

pkg/cri/server/container_stop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
8888
}
8989

9090
exitCtx, exitCancel := context.WithCancel(context.Background())
91-
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
91+
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
9292
defer func() {
9393
exitCancel()
9494
// This ensures that exit monitor is stopped before

pkg/cri/server/events.go

Lines changed: 110 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,22 @@ const (
5050
// Add a timeout for each event handling, events that timeout will be requeued and
5151
// handled again in the future.
5252
handleEventTimeout = 10 * time.Second
53-
54-
exitChannelSize = 1024
5553
)
5654

5755
// eventMonitor monitors containerd event and updates internal state correspondingly.
58-
// TODO(random-liu): Handle event for each container in a separate goroutine.
5956
type eventMonitor struct {
60-
c *criService
61-
ch <-chan *events.Envelope
62-
// exitCh receives container/sandbox exit events from exit monitors.
63-
exitCh chan *eventtypes.TaskExit
57+
c *criService
58+
ch <-chan *events.Envelope
6459
errCh <-chan error
6560
ctx context.Context
6661
cancel context.CancelFunc
6762
backOff *backOff
6863
}
6964

7065
type backOff struct {
66+
// queuePoolMu is mutex used to protect the queuePool map
67+
queuePoolMu sync.Mutex
68+
7169
queuePool map[string]*backOffQueue
7270
// tickerMu is mutex used to protect the ticker.
7371
tickerMu sync.Mutex
@@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
9391
c: c,
9492
ctx: ctx,
9593
cancel: cancel,
96-
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
9794
backOff: newBackOff(),
9895
}
9996
}
@@ -109,26 +106,102 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
109106
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
110107
}
111108

112-
// startExitMonitor starts an exit monitor for a given container/sandbox.
113-
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
109+
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
110+
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
114111
stopCh := make(chan struct{})
115112
go func() {
116113
defer close(stopCh)
117114
select {
118115
case exitRes := <-exitCh:
119116
exitStatus, exitedAt, err := exitRes.Result()
120117
if err != nil {
121-
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
118+
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
122119
exitStatus = unknownExitCode
123120
exitedAt = time.Now()
124121
}
125-
em.exitCh <- &eventtypes.TaskExit{
122+
123+
e := &eventtypes.TaskExit{
126124
ContainerID: id,
127125
ID: id,
128126
Pid: pid,
129127
ExitStatus: exitStatus,
130128
ExitedAt: exitedAt,
131129
}
130+
131+
logrus.Debugf("received exit event %+v", e)
132+
133+
err = func() error {
134+
dctx := ctrdutil.NamespacedContext()
135+
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
136+
defer dcancel()
137+
138+
sb, err := em.c.sandboxStore.Get(e.ID)
139+
if err == nil {
140+
if err := handleSandboxExit(dctx, e, sb); err != nil {
141+
return err
142+
}
143+
return nil
144+
} else if err != store.ErrNotExist {
145+
return errors.Wrapf(err, "failed to get sandbox %s", e.ID)
146+
}
147+
return nil
148+
}()
149+
if err != nil {
150+
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
151+
em.backOff.enBackOff(id, e)
152+
}
153+
return
154+
case <-ctx.Done():
155+
}
156+
}()
157+
return stopCh
158+
}
159+
160+
// startContainerExitMonitor starts an exit monitor for a given container.
161+
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
162+
stopCh := make(chan struct{})
163+
go func() {
164+
defer close(stopCh)
165+
select {
166+
case exitRes := <-exitCh:
167+
exitStatus, exitedAt, err := exitRes.Result()
168+
if err != nil {
169+
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
170+
exitStatus = unknownExitCode
171+
exitedAt = time.Now()
172+
}
173+
174+
e := &eventtypes.TaskExit{
175+
ContainerID: id,
176+
ID: id,
177+
Pid: pid,
178+
ExitStatus: exitStatus,
179+
ExitedAt: exitedAt,
180+
}
181+
182+
logrus.Debugf("received exit event %+v", e)
183+
184+
err = func() error {
185+
dctx := ctrdutil.NamespacedContext()
186+
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
187+
defer dcancel()
188+
189+
cntr, err := em.c.containerStore.Get(e.ID)
190+
if err == nil {
191+
if err := handleContainerExit(dctx, e, cntr); err != nil {
192+
return err
193+
}
194+
return nil
195+
} else if err != store.ErrNotExist {
196+
return errors.Wrapf(err, "failed to get container %s", e.ID)
197+
}
198+
return nil
199+
}()
200+
if err != nil {
201+
logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
202+
em.backOff.enBackOff(id, e)
203+
}
204+
return
132205
case <-ctx.Done():
133206
}
134207
}()
@@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
157230
return id, evt, nil
158231
}
159232

160-
// start starts the event monitor which monitors and handles all subscribed events. It returns
161-
// an error channel for the caller to wait for stop errors from the event monitor.
162-
// start must be called after subscribe.
233+
// start starts the event monitor which monitors and handles all subscribed events.
234+
// It returns an error channel for the caller to wait for stop errors from the
235+
// event monitor.
236+
//
237+
// NOTE:
238+
// 1. start must be called after subscribe.
239+
// 2. The task exit event has been handled in individual startSandboxExitMonitor
240+
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
241+
// it puts the event into backoff retry queue and event monitor will handle
242+
// it later.
163243
func (em *eventMonitor) start() <-chan error {
164244
errCh := make(chan error)
165245
if em.ch == nil || em.errCh == nil {
@@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
170250
defer close(errCh)
171251
for {
172252
select {
173-
case e := <-em.exitCh:
174-
logrus.Debugf("Received exit event %+v", e)
175-
id := e.ID
176-
if em.backOff.isInBackOff(id) {
177-
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
178-
em.backOff.enBackOff(id, e)
179-
break
180-
}
181-
if err := em.handleEvent(e); err != nil {
182-
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
183-
em.backOff.enBackOff(id, e)
184-
}
185253
case e := <-em.ch:
186254
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
187255
if e.Namespace != constants.K8sContainerdNamespace {
@@ -388,6 +456,9 @@ func newBackOff() *backOff {
388456
}
389457

390458
func (b *backOff) getExpiredIDs() []string {
459+
b.queuePoolMu.Lock()
460+
defer b.queuePoolMu.Unlock()
461+
391462
var ids []string
392463
for id, q := range b.queuePool {
393464
if q.isExpire() {
@@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
398469
}
399470

400471
func (b *backOff) isInBackOff(key string) bool {
472+
b.queuePoolMu.Lock()
473+
defer b.queuePoolMu.Unlock()
474+
401475
if _, ok := b.queuePool[key]; ok {
402476
return true
403477
}
@@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {
406480

407481
// enBackOff start to backOff and put event to the tail of queue
408482
func (b *backOff) enBackOff(key string, evt interface{}) {
483+
b.queuePoolMu.Lock()
484+
defer b.queuePoolMu.Unlock()
485+
409486
if queue, ok := b.queuePool[key]; ok {
410487
queue.events = append(queue.events, evt)
411488
return
@@ -415,13 +492,19 @@ func (b *backOff) enBackOff(key string, evt interface{}) {
415492

416493
// enBackOff get out the whole queue
417494
func (b *backOff) deBackOff(key string) *backOffQueue {
495+
b.queuePoolMu.Lock()
496+
defer b.queuePoolMu.Unlock()
497+
418498
queue := b.queuePool[key]
419499
delete(b.queuePool, key)
420500
return queue
421501
}
422502

423503
// enBackOff start to backOff again and put events to the queue
424504
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
505+
b.queuePoolMu.Lock()
506+
defer b.queuePoolMu.Unlock()
507+
425508
duration := 2 * oldDuration
426509
if duration > b.maxDuration {
427510
duration = b.maxDuration

pkg/cri/server/restart.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
290290
status.Reason = unknownExitReason
291291
} else {
292292
// Start exit monitor.
293-
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh)
293+
c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
294294
}
295295
case containerd.Stopped:
296296
// Task is stopped. Updata status and delete the task.
@@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
389389
// Task is running, set sandbox state as READY.
390390
status.State = sandboxstore.StateReady
391391
status.Pid = t.Pid()
392-
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
392+
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
393393
}
394394
} else {
395395
// Task is not running. Delete the task and set sandbox state as NOTREADY.

pkg/cri/server/sandbox_run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
331331
//
332332
// TaskOOM from containerd may come before sandbox is added to store,
333333
// but we don't care about sandbox TaskOOM right now, so it is fine.
334-
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
334+
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
335335

336336
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
337337
}

pkg/cri/server/sandbox_stop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
134134
}
135135

136136
exitCtx, exitCancel := context.WithCancel(context.Background())
137-
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
137+
stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
138138
defer func() {
139139
exitCancel()
140140
// This ensures that exit monitor is stopped before

0 commit comments

Comments
 (0)