@@ -19,6 +19,7 @@ package metadata
1919import (
2020 "context"
2121 "strings"
22+ "sync/atomic"
2223 "time"
2324
2425 "github.com/containerd/containerd/containers"
@@ -35,13 +36,13 @@ import (
3536)
3637
3738type containerStore struct {
38- tx * bolt. Tx
39+ db * DB
3940}
4041
4142// NewContainerStore returns a Store backed by an underlying bolt DB
42- func NewContainerStore (tx * bolt. Tx ) containers.Store {
43+ func NewContainerStore (db * DB ) containers.Store {
4344 return & containerStore {
44- tx : tx ,
45+ db : db ,
4546 }
4647}
4748
@@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
5152 return containers.Container {}, err
5253 }
5354
54- bkt := getContainerBucket (s .tx , namespace , id )
55- if bkt == nil {
56- return containers.Container {}, errors .Wrapf (errdefs .ErrNotFound , "container %q in namespace %q" , id , namespace )
57- }
58-
5955 container := containers.Container {ID : id }
60- if err := readContainer (& container , bkt ); err != nil {
61- return containers.Container {}, errors .Wrapf (err , "failed to read container %q" , id )
56+
57+ if err := view (ctx , s .db , func (tx * bolt.Tx ) error {
58+ bkt := getContainerBucket (tx , namespace , id )
59+ if bkt == nil {
60+ return errors .Wrapf (errdefs .ErrNotFound , "container %q in namespace %q" , id , namespace )
61+ }
62+
63+ if err := readContainer (& container , bkt ); err != nil {
64+ return errors .Wrapf (err , "failed to read container %q" , id )
65+ }
66+
67+ return nil
68+ }); err != nil {
69+ return containers.Container {}, err
6270 }
6371
6472 return container , nil
@@ -75,27 +83,30 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
7583 return nil , errors .Wrap (errdefs .ErrInvalidArgument , err .Error ())
7684 }
7785
78- bkt := getContainersBucket (s .tx , namespace )
79- if bkt == nil {
80- return nil , nil // empty store
81- }
82-
8386 var m []containers.Container
84- if err := bkt .ForEach (func (k , v []byte ) error {
85- cbkt := bkt .Bucket (k )
86- if cbkt == nil {
87- return nil
88- }
89- container := containers.Container {ID : string (k )}
9087
91- if err := readContainer (& container , cbkt ); err != nil {
92- return errors .Wrapf (err , "failed to read container %q" , string (k ))
88+ if err := view (ctx , s .db , func (tx * bolt.Tx ) error {
89+ bkt := getContainersBucket (tx , namespace )
90+ if bkt == nil {
91+ return nil // empty store
9392 }
9493
95- if filter .Match (adaptContainer (container )) {
96- m = append (m , container )
97- }
98- return nil
94+ return bkt .ForEach (func (k , v []byte ) error {
95+ cbkt := bkt .Bucket (k )
96+ if cbkt == nil {
97+ return nil
98+ }
99+ container := containers.Container {ID : string (k )}
100+
101+ if err := readContainer (& container , cbkt ); err != nil {
102+ return errors .Wrapf (err , "failed to read container %q" , string (k ))
103+ }
104+
105+ if filter .Match (adaptContainer (container )) {
106+ m = append (m , container )
107+ }
108+ return nil
109+ })
99110 }); err != nil {
100111 return nil , err
101112 }
@@ -113,23 +124,29 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
113124 return containers.Container {}, errors .Wrap (err , "create container failed validation" )
114125 }
115126
116- bkt , err := createContainersBucket (s .tx , namespace )
117- if err != nil {
118- return containers.Container {}, err
119- }
127+ if err := update (ctx , s .db , func (tx * bolt.Tx ) error {
128+ bkt , err := createContainersBucket (tx , namespace )
129+ if err != nil {
130+ return err
131+ }
120132
121- cbkt , err := bkt .CreateBucket ([]byte (container .ID ))
122- if err != nil {
123- if err == bolt .ErrBucketExists {
124- err = errors .Wrapf (errdefs .ErrAlreadyExists , "container %q" , container .ID )
133+ cbkt , err := bkt .CreateBucket ([]byte (container .ID ))
134+ if err != nil {
135+ if err == bolt .ErrBucketExists {
136+ err = errors .Wrapf (errdefs .ErrAlreadyExists , "container %q" , container .ID )
137+ }
138+ return err
125139 }
126- return containers.Container {}, err
127- }
128140
129- container .CreatedAt = time .Now ().UTC ()
130- container .UpdatedAt = container .CreatedAt
131- if err := writeContainer (cbkt , & container ); err != nil {
132- return containers.Container {}, errors .Wrapf (err , "failed to write container %q" , container .ID )
141+ container .CreatedAt = time .Now ().UTC ()
142+ container .UpdatedAt = container .CreatedAt
143+ if err := writeContainer (cbkt , & container ); err != nil {
144+ return errors .Wrapf (err , "failed to write container %q" , container .ID )
145+ }
146+
147+ return nil
148+ }); err != nil {
149+ return containers.Container {}, err
133150 }
134151
135152 return container , nil
@@ -145,85 +162,91 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
145162 return containers.Container {}, errors .Wrapf (errdefs .ErrInvalidArgument , "must specify a container id" )
146163 }
147164
148- bkt := getContainersBucket (s .tx , namespace )
149- if bkt == nil {
150- return containers.Container {}, errors .Wrapf (errdefs .ErrNotFound , "cannot update container %q in namespace %q" , container .ID , namespace )
151- }
152-
153- cbkt := bkt .Bucket ([]byte (container .ID ))
154- if cbkt == nil {
155- return containers.Container {}, errors .Wrapf (errdefs .ErrNotFound , "container %q" , container .ID )
156- }
157-
158165 var updated containers.Container
159- if err := readContainer (& updated , cbkt ); err != nil {
160- return updated , errors .Wrapf (err , "failed to read container %q" , container .ID )
161- }
162- createdat := updated .CreatedAt
163- updated .ID = container .ID
164-
165- if len (fieldpaths ) == 0 {
166- // only allow updates to these field on full replace.
167- fieldpaths = []string {"labels" , "spec" , "extensions" , "image" , "snapshotkey" }
168-
169- // Fields that are immutable must cause an error when no field paths
170- // are provided. This allows these fields to become mutable in the
171- // future.
172- if updated .Snapshotter != container .Snapshotter {
173- return containers.Container {}, errors .Wrapf (errdefs .ErrInvalidArgument , "container.Snapshotter field is immutable" )
166+ if err := update (ctx , s .db , func (tx * bolt.Tx ) error {
167+ bkt := getContainersBucket (tx , namespace )
168+ if bkt == nil {
169+ return errors .Wrapf (errdefs .ErrNotFound , "cannot update container %q in namespace %q" , container .ID , namespace )
174170 }
175171
176- if updated .Runtime .Name != container .Runtime .Name {
177- return containers.Container {}, errors .Wrapf (errdefs .ErrInvalidArgument , "container.Runtime.Name field is immutable" )
172+ cbkt := bkt .Bucket ([]byte (container .ID ))
173+ if cbkt == nil {
174+ return errors .Wrapf (errdefs .ErrNotFound , "container %q" , container .ID )
178175 }
179- }
180176
181- // apply the field mask. If you update this code, you better follow the
182- // field mask rules in field_mask.proto. If you don't know what this
183- // is, do not update this code.
184- for _ , path := range fieldpaths {
185- if strings .HasPrefix (path , "labels." ) {
186- if updated .Labels == nil {
187- updated .Labels = map [string ]string {}
177+ if err := readContainer (& updated , cbkt ); err != nil {
178+ return errors .Wrapf (err , "failed to read container %q" , container .ID )
179+ }
180+ createdat := updated .CreatedAt
181+ updated .ID = container .ID
182+
183+ if len (fieldpaths ) == 0 {
184+ // only allow updates to these field on full replace.
185+ fieldpaths = []string {"labels" , "spec" , "extensions" , "image" , "snapshotkey" }
186+
187+ // Fields that are immutable must cause an error when no field paths
188+ // are provided. This allows these fields to become mutable in the
189+ // future.
190+ if updated .Snapshotter != container .Snapshotter {
191+ return errors .Wrapf (errdefs .ErrInvalidArgument , "container.Snapshotter field is immutable" )
192+ }
193+
194+ if updated .Runtime .Name != container .Runtime .Name {
195+ return errors .Wrapf (errdefs .ErrInvalidArgument , "container.Runtime.Name field is immutable" )
188196 }
189- key := strings .TrimPrefix (path , "labels." )
190- updated .Labels [key ] = container .Labels [key ]
191- continue
192197 }
193198
194- if strings .HasPrefix (path , "extensions." ) {
195- if updated .Extensions == nil {
196- updated .Extensions = map [string ]types.Any {}
199+ // apply the field mask. If you update this code, you better follow the
200+ // field mask rules in field_mask.proto. If you don't know what this
201+ // is, do not update this code.
202+ for _ , path := range fieldpaths {
203+ if strings .HasPrefix (path , "labels." ) {
204+ if updated .Labels == nil {
205+ updated .Labels = map [string ]string {}
206+ }
207+ key := strings .TrimPrefix (path , "labels." )
208+ updated .Labels [key ] = container .Labels [key ]
209+ continue
210+ }
211+
212+ if strings .HasPrefix (path , "extensions." ) {
213+ if updated .Extensions == nil {
214+ updated .Extensions = map [string ]types.Any {}
215+ }
216+ key := strings .TrimPrefix (path , "extensions." )
217+ updated .Extensions [key ] = container .Extensions [key ]
218+ continue
219+ }
220+
221+ switch path {
222+ case "labels" :
223+ updated .Labels = container .Labels
224+ case "spec" :
225+ updated .Spec = container .Spec
226+ case "extensions" :
227+ updated .Extensions = container .Extensions
228+ case "image" :
229+ updated .Image = container .Image
230+ case "snapshotkey" :
231+ updated .SnapshotKey = container .SnapshotKey
232+ default :
233+ return errors .Wrapf (errdefs .ErrInvalidArgument , "cannot update %q field on %q" , path , container .ID )
197234 }
198- key := strings .TrimPrefix (path , "extensions." )
199- updated .Extensions [key ] = container .Extensions [key ]
200- continue
201235 }
202236
203- switch path {
204- case "labels" :
205- updated .Labels = container .Labels
206- case "spec" :
207- updated .Spec = container .Spec
208- case "extensions" :
209- updated .Extensions = container .Extensions
210- case "image" :
211- updated .Image = container .Image
212- case "snapshotkey" :
213- updated .SnapshotKey = container .SnapshotKey
214- default :
215- return containers.Container {}, errors .Wrapf (errdefs .ErrInvalidArgument , "cannot update %q field on %q" , path , container .ID )
237+ if err := validateContainer (& updated ); err != nil {
238+ return errors .Wrap (err , "update failed validation" )
216239 }
217- }
218240
219- if err := validateContainer (& updated ); err != nil {
220- return containers.Container {}, errors .Wrap (err , "update failed validation" )
221- }
241+ updated .CreatedAt = createdat
242+ updated .UpdatedAt = time .Now ().UTC ()
243+ if err := writeContainer (cbkt , & updated ); err != nil {
244+ return errors .Wrapf (err , "failed to write container %q" , container .ID )
245+ }
222246
223- updated .CreatedAt = createdat
224- updated .UpdatedAt = time .Now ().UTC ()
225- if err := writeContainer (cbkt , & updated ); err != nil {
226- return containers.Container {}, errors .Wrapf (err , "failed to write container %q" , container .ID )
247+ return nil
248+ }); err != nil {
249+ return containers.Container {}, err
227250 }
228251
229252 return updated , nil
@@ -235,15 +258,23 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
235258 return err
236259 }
237260
238- bkt := getContainersBucket (s .tx , namespace )
239- if bkt == nil {
240- return errors .Wrapf (errdefs .ErrNotFound , "cannot delete container %q in namespace %q" , id , namespace )
241- }
261+ return update (ctx , s .db , func (tx * bolt.Tx ) error {
262+ bkt := getContainersBucket (tx , namespace )
263+ if bkt == nil {
264+ return errors .Wrapf (errdefs .ErrNotFound , "cannot delete container %q in namespace %q" , id , namespace )
265+ }
242266
243- if err := bkt .DeleteBucket ([]byte (id )); err == bolt .ErrBucketNotFound {
244- return errors .Wrapf (errdefs .ErrNotFound , "container %v" , id )
245- }
246- return err
267+ if err := bkt .DeleteBucket ([]byte (id )); err != nil {
268+ if err == bolt .ErrBucketNotFound {
269+ err = errors .Wrapf (errdefs .ErrNotFound , "container %v" , id )
270+ }
271+ return err
272+ }
273+
274+ atomic .AddUint32 (& s .db .dirty , 1 )
275+
276+ return nil
277+ })
247278}
248279
249280func validateContainer (container * containers.Container ) error {
0 commit comments