@@ -34,6 +34,7 @@ import (
3434 ctrdutil "github.com/containerd/cri/pkg/containerd/util"
3535 "github.com/containerd/cri/pkg/store"
3636 containerstore "github.com/containerd/cri/pkg/store/container"
37+ imagestore "github.com/containerd/cri/pkg/store/image"
3738 sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
3839)
3940
@@ -49,6 +50,7 @@ const (
4950type eventMonitor struct {
5051 containerStore * containerstore.Store
5152 sandboxStore * sandboxstore.Store
53+ imageStore * imagestore.Store
5254 ch <- chan * events.Envelope
5355 errCh <- chan error
5456 ctx context.Context
@@ -76,12 +78,13 @@ type backOffQueue struct {
7678
7779// Create new event monitor. New event monitor will start subscribing containerd event. All events
7880// happen after it should be monitored.
79- func newEventMonitor (c * containerstore.Store , s * sandboxstore.Store ) * eventMonitor {
81+ func newEventMonitor (c * containerstore.Store , s * sandboxstore.Store , i * imagestore. Store ) * eventMonitor {
8082 // event subscribe doesn't need namespace.
8183 ctx , cancel := context .WithCancel (context .Background ())
8284 return & eventMonitor {
8385 containerStore : c ,
8486 sandboxStore : s ,
87+ imageStore : i ,
8588 ctx : ctx ,
8689 cancel : cancel ,
8790 backOff : newBackOff (),
@@ -93,29 +96,36 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
9396 filters := []string {
9497 `topic=="/tasks/exit"` ,
9598 `topic=="/tasks/oom"` ,
99+ `topic~="/images/"` ,
96100 }
97101 em .ch , em .errCh = subscriber .Subscribe (em .ctx , filters ... )
98102}
99103
100104func convertEvent (e * gogotypes.Any ) (string , interface {}, error ) {
101- containerID := ""
105+ id := ""
102106 evt , err := typeurl .UnmarshalAny (e )
103107 if err != nil {
104108 return "" , nil , errors .Wrap (err , "failed to unmarshalany" )
105109 }
106110
107111 switch evt .(type ) {
108112 case * eventtypes.TaskExit :
109- containerID = evt .(* eventtypes.TaskExit ).ContainerID
113+ id = evt .(* eventtypes.TaskExit ).ContainerID
110114 case * eventtypes.TaskOOM :
111- containerID = evt .(* eventtypes.TaskOOM ).ContainerID
115+ id = evt .(* eventtypes.TaskOOM ).ContainerID
116+ case * eventtypes.ImageCreate :
117+ id = evt .(* eventtypes.ImageCreate ).Name
118+ case * eventtypes.ImageUpdate :
119+ id = evt .(* eventtypes.ImageUpdate ).Name
120+ case * eventtypes.ImageDelete :
121+ id = evt .(* eventtypes.ImageDelete ).Name
112122 default :
113123 return "" , nil , errors .New ("unsupported event" )
114124 }
115- return containerID , evt , nil
125+ return id , evt , nil
116126}
117127
118- // start starts the event monitor which monitors and handles all container events. It returns
128+ // start starts the event monitor which monitors and handles all subscribed events. It returns
119129// an error channel for the caller to wait for stop errors from the event monitor.
120130// start must be called after subscribe.
121131func (em * eventMonitor ) start () <- chan error {
@@ -130,19 +140,19 @@ func (em *eventMonitor) start() <-chan error {
130140 select {
131141 case e := <- em .ch :
132142 logrus .Debugf ("Received containerd event timestamp - %v, namespace - %q, topic - %q" , e .Timestamp , e .Namespace , e .Topic )
133- cID , evt , err := convertEvent (e .Event )
143+ id , evt , err := convertEvent (e .Event )
134144 if err != nil {
135145 logrus .WithError (err ).Errorf ("Failed to convert event %+v" , e )
136146 break
137147 }
138- if em .backOff .isInBackOff (cID ) {
139- logrus .Infof ("Events for container %q is in backoff, enqueue event %+v" , cID , evt )
140- em .backOff .enBackOff (cID , evt )
148+ if em .backOff .isInBackOff (id ) {
149+ logrus .Infof ("Events for %q is in backoff, enqueue event %+v" , id , evt )
150+ em .backOff .enBackOff (id , evt )
141151 break
142152 }
143153 if err := em .handleEvent (evt ); err != nil {
144- logrus .WithError (err ).Errorf ("Failed to handle event %+v for container %s" , evt , cID )
145- em .backOff .enBackOff (cID , evt )
154+ logrus .WithError (err ).Errorf ("Failed to handle event %+v for %s" , evt , id )
155+ em .backOff .enBackOff (id , evt )
146156 }
147157 case err := <- em .errCh :
148158 // Close errCh in defer directly if there is no error.
@@ -152,13 +162,13 @@ func (em *eventMonitor) start() <-chan error {
152162 }
153163 return
154164 case <- backOffCheckCh :
155- cIDs := em .backOff .getExpiredContainers ()
156- for _ , cID := range cIDs {
157- queue := em .backOff .deBackOff (cID )
165+ ids := em .backOff .getExpiredIDs ()
166+ for _ , id := range ids {
167+ queue := em .backOff .deBackOff (id )
158168 for i , any := range queue .events {
159169 if err := em .handleEvent (any ); err != nil {
160- logrus .WithError (err ).Errorf ("Failed to handle backOff event %+v for container %s" , any , cID )
161- em .backOff .reBackOff (cID , queue .events [i :], queue .duration )
170+ logrus .WithError (err ).Errorf ("Failed to handle backOff event %+v for %s" , any , id )
171+ em .backOff .reBackOff (id , queue .events [i :], queue .duration )
162172 break
163173 }
164174 }
@@ -230,6 +240,18 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
230240 if err != nil {
231241 return errors .Wrap (err , "failed to update container status for TaskOOM event" )
232242 }
243+ case * eventtypes.ImageCreate :
244+ e := any .(* eventtypes.ImageCreate )
245+ logrus .Infof ("ImageCreate event %+v" , e )
246+ return em .imageStore .Update (ctx , e .Name )
247+ case * eventtypes.ImageUpdate :
248+ e := any .(* eventtypes.ImageUpdate )
249+ logrus .Infof ("ImageUpdate event %+v" , e )
250+ return em .imageStore .Update (ctx , e .Name )
251+ case * eventtypes.ImageDelete :
252+ e := any .(* eventtypes.ImageDelete )
253+ logrus .Infof ("ImageDelete event %+v" , e )
254+ return em .imageStore .Update (ctx , e .Name )
233255 }
234256
235257 return nil
@@ -331,14 +353,14 @@ func newBackOff() *backOff {
331353 }
332354}
333355
334- func (b * backOff ) getExpiredContainers () []string {
335- var containers []string
336- for c , q := range b .queuePool {
356+ func (b * backOff ) getExpiredIDs () []string {
357+ var ids []string
358+ for id , q := range b .queuePool {
337359 if q .isExpire () {
338- containers = append (containers , c )
360+ ids = append (ids , id )
339361 }
340362 }
341- return containers
363+ return ids
342364}
343365
344366func (b * backOff ) isInBackOff (key string ) bool {
0 commit comments