Skip to content
This repository was archived by the owner on Mar 9, 2022. It is now read-only.

Commit 953d67d

Browse files
committed
Create image reference cache.
Signed-off-by: Lantao Liu <[email protected]>
1 parent cfdf872 commit 953d67d

22 files changed

Lines changed: 638 additions & 454 deletions

pkg/server/container_create.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,9 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
115115

116116
// Prepare container image snapshot. For container, the image should have
117117
// been pulled before creating the container, so do not ensure the image.
118-
imageRef := config.GetImage().GetImage()
119-
image, err := c.localResolve(ctx, imageRef)
118+
image, err := c.localResolve(config.GetImage().GetImage())
120119
if err != nil {
121-
return nil, errors.Wrapf(err, "failed to resolve image %q", imageRef)
122-
}
123-
if image == nil {
124-
return nil, errors.Errorf("image %q not found", imageRef)
120+
return nil, errors.Wrapf(err, "failed to resolve image %q", config.GetImage().GetImage())
125121
}
126122

127123
// Run container using the same runtime with sandbox.

pkg/server/container_status.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ func (c *criService) ContainerStatus(ctx context.Context, r *runtime.ContainerSt
4646
if err != nil {
4747
return nil, errors.Wrapf(err, "failed to get image %q", imageRef)
4848
}
49-
if len(image.RepoTags) > 0 {
49+
repoTags, repoDigests := parseImageReferences(image.References)
50+
if len(repoTags) > 0 {
5051
// Based on current behavior of dockershim, this field should be
5152
// image tag.
52-
spec = &runtime.ImageSpec{Image: image.RepoTags[0]}
53+
spec = &runtime.ImageSpec{Image: repoTags[0]}
5354
}
54-
if len(image.RepoDigests) > 0 {
55+
if len(repoDigests) > 0 {
5556
// Based on the CRI definition, this field will be consumed by user.
56-
imageRef = image.RepoDigests[0]
57+
imageRef = repoDigests[0]
5758
}
5859
status := toCRIContainerStatus(container, spec, imageRef)
5960
info, err := toCRIContainerInfo(ctx, container, r.GetVerbose())

pkg/server/container_status_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,20 @@ func getContainerStatusTestData() (*containerstore.Metadata, *containerstore.Sta
6363
StartedAt: startedAt,
6464
}
6565
image := &imagestore.Image{
66-
ID: imageID,
67-
RepoTags: []string{"test-image-repo-tag"},
68-
RepoDigests: []string{"test-image-repo-digest"},
66+
ID: imageID,
67+
References: []string{
68+
"gcr.io/library/busybox:latest",
69+
"gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
70+
},
6971
}
7072
expected := &runtime.ContainerStatus{
7173
Id: testID,
7274
Metadata: config.GetMetadata(),
7375
State: runtime.ContainerState_CONTAINER_RUNNING,
7476
CreatedAt: createdAt,
7577
StartedAt: startedAt,
76-
Image: &runtime.ImageSpec{Image: "test-image-repo-tag"},
77-
ImageRef: "test-image-repo-digest",
78+
Image: &runtime.ImageSpec{Image: "gcr.io/library/busybox:latest"},
79+
ImageRef: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
7880
Reason: completeExitReason,
7981
Labels: config.GetLabels(),
8082
Annotations: config.GetAnnotations(),
@@ -120,7 +122,7 @@ func TestToCRIContainerStatus(t *testing.T) {
120122
expectedReason: errorExitReason,
121123
},
122124
} {
123-
metadata, status, image, expected := getContainerStatusTestData()
125+
metadata, status, _, expected := getContainerStatusTestData()
124126
// Update status with test case.
125127
status.FinishedAt = test.finishedAt
126128
status.ExitCode = test.exitCode
@@ -138,8 +140,8 @@ func TestToCRIContainerStatus(t *testing.T) {
138140
expected.ExitCode = test.exitCode
139141
expected.Message = test.message
140142
containerStatus := toCRIContainerStatus(container,
141-
&runtime.ImageSpec{Image: image.RepoTags[0]},
142-
image.RepoDigests[0])
143+
expected.Image,
144+
expected.ImageRef)
143145
assert.Equal(t, expected, containerStatus, desc)
144146
}
145147
}
@@ -207,7 +209,8 @@ func TestContainerStatus(t *testing.T) {
207209
assert.NoError(t, c.containerStore.Add(container))
208210
}
209211
if test.imageExist {
210-
c.imageStore.Add(*image)
212+
c.imageStore, err = imagestore.NewFakeStore([]imagestore.Image{*image})
213+
assert.NoError(t, err)
211214
}
212215
resp, err := c.ContainerStatus(context.Background(), &runtime.ContainerStatusRequest{ContainerId: container.ID})
213216
if test.expectErr {

pkg/server/events.go

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
3535
"github.com/containerd/cri/pkg/store"
3636
containerstore "github.com/containerd/cri/pkg/store/container"
37+
imagestore "github.com/containerd/cri/pkg/store/image"
3738
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
3839
)
3940

@@ -49,6 +50,7 @@ const (
4950
type eventMonitor struct {
5051
containerStore *containerstore.Store
5152
sandboxStore *sandboxstore.Store
53+
imageStore *imagestore.Store
5254
ch <-chan *events.Envelope
5355
errCh <-chan error
5456
ctx context.Context
@@ -76,12 +78,13 @@ type backOffQueue struct {
7678

7779
// Create new event monitor. New event monitor will start subscribing containerd event. All events
7880
// happen after it should be monitored.
79-
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
81+
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor {
8082
// event subscribe doesn't need namespace.
8183
ctx, cancel := context.WithCancel(context.Background())
8284
return &eventMonitor{
8385
containerStore: c,
8486
sandboxStore: s,
87+
imageStore: i,
8588
ctx: ctx,
8689
cancel: cancel,
8790
backOff: newBackOff(),
@@ -93,29 +96,36 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
9396
filters := []string{
9497
`topic=="/tasks/exit"`,
9598
`topic=="/tasks/oom"`,
99+
`topic~="/images/"`,
96100
}
97101
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
98102
}
99103

100104
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
101-
containerID := ""
105+
id := ""
102106
evt, err := typeurl.UnmarshalAny(e)
103107
if err != nil {
104108
return "", nil, errors.Wrap(err, "failed to unmarshalany")
105109
}
106110

107111
switch evt.(type) {
108112
case *eventtypes.TaskExit:
109-
containerID = evt.(*eventtypes.TaskExit).ContainerID
113+
id = evt.(*eventtypes.TaskExit).ContainerID
110114
case *eventtypes.TaskOOM:
111-
containerID = evt.(*eventtypes.TaskOOM).ContainerID
115+
id = evt.(*eventtypes.TaskOOM).ContainerID
116+
case *eventtypes.ImageCreate:
117+
id = evt.(*eventtypes.ImageCreate).Name
118+
case *eventtypes.ImageUpdate:
119+
id = evt.(*eventtypes.ImageUpdate).Name
120+
case *eventtypes.ImageDelete:
121+
id = evt.(*eventtypes.ImageDelete).Name
112122
default:
113123
return "", nil, errors.New("unsupported event")
114124
}
115-
return containerID, evt, nil
125+
return id, evt, nil
116126
}
117127

118-
// start starts the event monitor which monitors and handles all container events. It returns
128+
// start starts the event monitor which monitors and handles all subscribed events. It returns
119129
// an error channel for the caller to wait for stop errors from the event monitor.
120130
// start must be called after subscribe.
121131
func (em *eventMonitor) start() <-chan error {
@@ -130,19 +140,19 @@ func (em *eventMonitor) start() <-chan error {
130140
select {
131141
case e := <-em.ch:
132142
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
133-
cID, evt, err := convertEvent(e.Event)
143+
id, evt, err := convertEvent(e.Event)
134144
if err != nil {
135145
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
136146
break
137147
}
138-
if em.backOff.isInBackOff(cID) {
139-
logrus.Infof("Events for container %q is in backoff, enqueue event %+v", cID, evt)
140-
em.backOff.enBackOff(cID, evt)
148+
if em.backOff.isInBackOff(id) {
149+
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
150+
em.backOff.enBackOff(id, evt)
141151
break
142152
}
143153
if err := em.handleEvent(evt); err != nil {
144-
logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID)
145-
em.backOff.enBackOff(cID, evt)
154+
logrus.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
155+
em.backOff.enBackOff(id, evt)
146156
}
147157
case err := <-em.errCh:
148158
// Close errCh in defer directly if there is no error.
@@ -152,13 +162,13 @@ func (em *eventMonitor) start() <-chan error {
152162
}
153163
return
154164
case <-backOffCheckCh:
155-
cIDs := em.backOff.getExpiredContainers()
156-
for _, cID := range cIDs {
157-
queue := em.backOff.deBackOff(cID)
165+
ids := em.backOff.getExpiredIDs()
166+
for _, id := range ids {
167+
queue := em.backOff.deBackOff(id)
158168
for i, any := range queue.events {
159169
if err := em.handleEvent(any); err != nil {
160-
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID)
161-
em.backOff.reBackOff(cID, queue.events[i:], queue.duration)
170+
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for %s", any, id)
171+
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
162172
break
163173
}
164174
}
@@ -230,6 +240,18 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
230240
if err != nil {
231241
return errors.Wrap(err, "failed to update container status for TaskOOM event")
232242
}
243+
case *eventtypes.ImageCreate:
244+
e := any.(*eventtypes.ImageCreate)
245+
logrus.Infof("ImageCreate event %+v", e)
246+
return em.imageStore.Update(ctx, e.Name)
247+
case *eventtypes.ImageUpdate:
248+
e := any.(*eventtypes.ImageUpdate)
249+
logrus.Infof("ImageUpdate event %+v", e)
250+
return em.imageStore.Update(ctx, e.Name)
251+
case *eventtypes.ImageDelete:
252+
e := any.(*eventtypes.ImageDelete)
253+
logrus.Infof("ImageDelete event %+v", e)
254+
return em.imageStore.Update(ctx, e.Name)
233255
}
234256

235257
return nil
@@ -331,14 +353,14 @@ func newBackOff() *backOff {
331353
}
332354
}
333355

334-
func (b *backOff) getExpiredContainers() []string {
335-
var containers []string
336-
for c, q := range b.queuePool {
356+
func (b *backOff) getExpiredIDs() []string {
357+
var ids []string
358+
for id, q := range b.queuePool {
337359
if q.isExpire() {
338-
containers = append(containers, c)
360+
ids = append(ids, id)
339361
}
340362
}
341-
return containers
363+
return ids
342364
}
343365

344366
func (b *backOff) isInBackOff(key string) bool {

pkg/server/events_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ func TestBackOff(t *testing.T) {
9494
assert.Equal(t, actual.isInBackOff(notExistKey), false)
9595

9696
t.Logf("No containers should be expired")
97-
assert.Empty(t, actual.getExpiredContainers())
97+
assert.Empty(t, actual.getExpiredIDs())
9898

9999
t.Logf("Should be able to get all keys which are expired for backOff")
100100
testClock.Sleep(backOffInitDuration)
101-
actKeyList := actual.getExpiredContainers()
101+
actKeyList := actual.getExpiredIDs()
102102
assert.Equal(t, len(inputQueues), len(actKeyList))
103103
for k := range inputQueues {
104104
assert.Contains(t, actKeyList, k)

0 commit comments

Comments
 (0)