Skip to content

Commit 281b22f

Browse files
committed
Add container event support to containerd
Signed-off-by: ruiwen-zhao <[email protected]>
1 parent 0ac2322 commit 281b22f

16 files changed

Lines changed: 290 additions & 20 deletions
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"testing"
24+
"time"
25+
26+
"github.com/containerd/containerd/integration/images"
27+
"github.com/stretchr/testify/assert"
28+
"github.com/stretchr/testify/require"
29+
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
30+
)
31+
32+
func TestContainerEvents(t *testing.T) {
33+
ctx := context.Background()
34+
defer ctx.Done()
35+
36+
t.Log("Set up container events streaming client")
37+
containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
38+
assert.NoError(t, err)
39+
containerEventsChan := make(chan *runtime.ContainerEventResponse)
40+
41+
go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient)
42+
// Sleep to make sure the event channel is set up, and then drain all events
43+
// emitted by previous tests.
44+
time.Sleep(2 * time.Second)
45+
drainContainerEventsChan(t, containerEventsChan)
46+
47+
t.Logf("Create a pod config and run sandbox container")
48+
sandboxName := "container_events_sandbox"
49+
sb, sbConfig := PodSandboxConfigWithCleanup(t, sandboxName, "container_events")
50+
51+
assert.NoError(t, checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, sandboxName, ""))
52+
assert.NoError(t, checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, sandboxName, ""))
53+
54+
pauseImage := images.Get(images.Pause)
55+
EnsureImageExists(t, pauseImage)
56+
t.Logf("Create a container config and run container in a pod")
57+
containerConfig := ContainerConfig(
58+
"container1",
59+
pauseImage,
60+
WithTestLabels(),
61+
WithTestAnnotations(),
62+
)
63+
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
64+
require.NoError(t, err)
65+
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, sandboxName, cn)
66+
defer func() {
67+
assert.NoError(t, runtimeService.RemoveContainer(cn))
68+
assert.NoError(t, checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, sandboxName, cn))
69+
}()
70+
71+
require.NoError(t, runtimeService.StartContainer(cn))
72+
assert.NoError(t, checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, sandboxName, cn))
73+
defer func() {
74+
assert.NoError(t, runtimeService.StopContainer(cn, 10))
75+
assert.NoError(t, checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, sandboxName, cn))
76+
}()
77+
}
78+
79+
func listenToEventChannel(ctx context.Context, t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, containerEventsStreamingClient runtime.RuntimeService_GetContainerEventsClient) {
80+
t.Helper()
81+
for {
82+
resp, err := containerEventsStreamingClient.Recv()
83+
if err == io.EOF {
84+
return
85+
}
86+
assert.NoError(t, err)
87+
if resp != nil {
88+
containerEventsChan <- resp
89+
}
90+
91+
select {
92+
case <-ctx.Done():
93+
return
94+
default:
95+
}
96+
}
97+
}
98+
99+
func drainContainerEventsChan(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse) {
100+
for {
101+
select {
102+
case <-containerEventsChan:
103+
case <-time.After(500 * time.Millisecond):
104+
return
105+
}
106+
}
107+
}
108+
109+
func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedSandboxName string, expectedContainerID string) error {
110+
t.Helper()
111+
var resp *runtime.ContainerEventResponse
112+
select {
113+
case resp = <-containerEventsChan:
114+
case <-time.After(500 * time.Millisecond):
115+
return fmt.Errorf("assertContainerEventResponse: timeout waiting for events from channel")
116+
}
117+
118+
t.Logf("Container Event response received: %+v", *resp)
119+
120+
if resp.ContainerEventType != expectedType {
121+
return fmt.Errorf("assertContainerEventResponse: wrong event type. Expected %v, got %v", expectedType, resp.ContainerEventType)
122+
}
123+
124+
if resp.PodSandboxMetadata.Name != expectedSandboxName {
125+
return fmt.Errorf("assertContainerEventResponse: wrong sandbox name. Expected %v, got %v", expectedSandboxName, resp.PodSandboxMetadata.Name)
126+
}
127+
128+
if expectedContainerID != "" && resp.ContainerId != expectedContainerID {
129+
return fmt.Errorf("assertContainerEventResponse: wrong container ID. Expected %v, got %v", expectedContainerID, resp.ContainerId)
130+
}
131+
132+
return nil
133+
}

integration/cri-api/pkg/apis/services.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ limitations under the License.
3535
package cri
3636

3737
import (
38+
"context"
3839
"time"
3940

4041
"google.golang.org/grpc"
@@ -75,6 +76,7 @@ type ContainerManager interface {
7576
// for the container. If it returns error, new container log file MUST NOT
7677
// be created.
7778
ReopenContainerLog(ContainerID string, opts ...grpc.CallOption) error
79+
GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error)
7880
}
7981

8082
// PodSandboxManager contains methods for operating on PodSandboxes. The methods

integration/remote/remote_runtime.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,3 +594,15 @@ func (r *RuntimeService) ReopenContainerLog(containerID string, opts ...grpc.Cal
594594
klog.V(10).Infof("[RuntimeService] ReopenContainerLog Response (containerID=%v)", containerID)
595595
return nil
596596
}
597+
598+
// GetContainerEvents returns a GRPC client to stream container events
599+
func (r *RuntimeService) GetContainerEvents(ctx context.Context, request *runtimeapi.GetEventsRequest, opts ...grpc.CallOption) (runtimeapi.RuntimeService_GetContainerEventsClient, error) {
600+
klog.V(10).Infof("[RuntimeService] GetContainerEvents", r.timeout)
601+
602+
client, err := r.runtimeClient.GetContainerEvents(ctx, request, opts...)
603+
if err != nil {
604+
klog.Errorf("GetContainerEvents from runtime service failed: %v", err)
605+
return nil, err
606+
}
607+
return client, nil
608+
}

pkg/cri/sbserver/container_events.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package sbserver
1818

1919
import (
20-
"errors"
21-
2220
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
2321
)
2422

2523
func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error {
26-
return errors.New("not implemented")
24+
// TODO (https://github.com/containerd/containerd/issues/7318):
25+
// replace with a real implementation that broadcasts containerEventsChan to all subscribers.
26+
for event := range c.containerEventsChan {
27+
if err := s.Send(&event); err != nil {
28+
return err
29+
}
30+
}
31+
return nil
2732
}

pkg/cri/sbserver/service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ type criService struct {
123123
// one in-flight fetch request or unpack handler for a given descriptor's
124124
// or chain ID.
125125
unpackDuplicationSuppressor kmutex.KeyedLocker
126+
// containerEventsChan is used to capture container events and send them
127+
// to the caller of GetContainerEvents.
128+
containerEventsChan chan runtime.ContainerEventResponse
126129
}
127130

128131
// NewCRIService returns a new instance of CRIService
@@ -144,6 +147,9 @@ func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIServi
144147
unpackDuplicationSuppressor: kmutex.New(),
145148
}
146149

150+
// TODO: figure out a proper channel size.
151+
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
152+
147153
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
148154
return nil, fmt.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
149155
}

pkg/cri/server/container_create.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,14 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
283283
return nil, fmt.Errorf("failed to add container %q into store: %w", id, err)
284284
}
285285

286+
event := runtime.ContainerEventResponse{
287+
ContainerId: id,
288+
ContainerEventType: runtime.ContainerEventType_CONTAINER_CREATED_EVENT,
289+
CreatedAt: time.Now().UnixNano(),
290+
PodSandboxMetadata: sandbox.Config.Metadata,
291+
}
292+
c.sendContainerEvent(event)
293+
286294
containerCreateTimer.WithValues(ociRuntime.Type).UpdateSince(start)
287295

288296
return &runtime.CreateContainerResponse{ContainerId: id}, nil

pkg/cri/server/container_events.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package server
1818

1919
import (
20-
"errors"
21-
2220
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
2321
)
2422

2523
func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error {
26-
return errors.New("not implemented")
24+
// TODO (https://github.com/containerd/containerd/issues/7318):
25+
// replace with a real implementation that broadcasts containerEventsChan to all subscribers.
26+
for event := range c.containerEventsChan {
27+
if err := s.Send(&event); err != nil {
28+
return err
29+
}
30+
}
31+
return nil
2732
}

pkg/cri/server/container_remove.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
4848
return nil, fmt.Errorf("get container info: %w", err)
4949
}
5050

51+
sandboxID := container.SandboxID
52+
sandbox, err := c.sandboxStore.Get(sandboxID)
53+
if err != nil {
54+
return nil, fmt.Errorf("can't find sandbox for sandbox ID %s: %w", sandboxID, err)
55+
}
56+
5157
// Forcibly stop the containers if they are in running or unknown state
5258
state := container.Status.Get().State()
5359
if state == runtime.ContainerState_CONTAINER_RUNNING ||
@@ -106,6 +112,14 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta
106112

107113
c.containerNameIndex.ReleaseByKey(id)
108114

115+
event := runtime.ContainerEventResponse{
116+
ContainerId: id,
117+
ContainerEventType: runtime.ContainerEventType_CONTAINER_DELETED_EVENT,
118+
CreatedAt: time.Now().UnixNano(),
119+
PodSandboxMetadata: sandbox.Config.Metadata,
120+
}
121+
c.sendContainerEvent(event)
122+
109123
containerRemoveTimer.WithValues(i.Runtime.Name).UpdateSince(start)
110124

111125
return &runtime.RemoveContainerResponse{}, nil

pkg/cri/server/container_start.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
177177
// It handles the TaskExit event and update container state after this.
178178
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
179179

180+
event := runtime.ContainerEventResponse{
181+
ContainerId: id,
182+
ContainerEventType: runtime.ContainerEventType_CONTAINER_STARTED_EVENT,
183+
CreatedAt: time.Now().UnixNano(),
184+
PodSandboxMetadata: sandbox.Metadata.Config.Metadata,
185+
}
186+
c.sendContainerEvent(event)
187+
180188
containerStartTimer.WithValues(info.Runtime.Name).UpdateSince(start)
181189

182190
return &runtime.StartContainerResponse{}, nil

pkg/cri/server/container_stop.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/containerd/containerd/errdefs"
2828
"github.com/containerd/containerd/log"
2929
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
30+
"github.com/containerd/containerd/pkg/cri/store/sandbox"
3031
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
3132
"github.com/containerd/containerd/protobuf"
3233

@@ -61,6 +62,12 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer
6162
func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
6263
id := container.ID
6364

65+
sandboxID := container.SandboxID
66+
sb, err := c.sandboxStore.Get(sandboxID)
67+
if err != nil {
68+
return fmt.Errorf("can't find sandbox for sandbox ID %s: %w", sandboxID, err)
69+
}
70+
6471
// Return without error if container is not running. This makes sure that
6572
// stop only takes real action after the container is started.
6673
state := container.Status.Get().State()
@@ -78,7 +85,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
7885
}
7986
// Don't return for unknown state, some cleanup needs to be done.
8087
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
81-
return cleanupUnknownContainer(ctx, id, container)
88+
return cleanupUnknownContainer(ctx, id, container, sb, c)
8289
}
8390
return nil
8491
}
@@ -93,7 +100,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
93100
if !errdefs.IsNotFound(err) {
94101
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
95102
}
96-
return cleanupUnknownContainer(ctx, id, container)
103+
return cleanupUnknownContainer(ctx, id, container, sb, c)
97104
}
98105

99106
exitCtx, exitCancel := context.WithCancel(context.Background())
@@ -196,13 +203,13 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
196203
}
197204

198205
// cleanupUnknownContainer cleanup stopped container in unknown state.
199-
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
206+
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sb sandbox.Sandbox, c *criService) error {
200207
// Reuse handleContainerExit to do the cleanup.
201208
return handleContainerExit(ctx, &eventtypes.TaskExit{
202209
ContainerID: id,
203210
ID: id,
204211
Pid: 0,
205212
ExitStatus: unknownExitCode,
206213
ExitedAt: protobuf.ToTimestamp(time.Now()),
207-
}, cntr)
214+
}, cntr, sb, c)
208215
}

0 commit comments

Comments
 (0)