Skip to content

Commit 0b224ac

Browse files
committed
Update metadata interfaces for containers and leases
Add more thorough dirty checking across all types which may be deleted and hold references. Signed-off-by: Derek McGowan <[email protected]>
1 parent d4802a6 commit 0b224ac

15 files changed

Lines changed: 518 additions & 585 deletions

File tree

metadata/containers.go

Lines changed: 145 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metadata
1919
import (
2020
"context"
2121
"strings"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/containerd/containerd/containers"
@@ -35,13 +36,13 @@ import (
3536
)
3637

3738
type containerStore struct {
38-
tx *bolt.Tx
39+
db *DB
3940
}
4041

4142
// NewContainerStore returns a Store backed by an underlying bolt DB
42-
func NewContainerStore(tx *bolt.Tx) containers.Store {
43+
func NewContainerStore(db *DB) containers.Store {
4344
return &containerStore{
44-
tx: tx,
45+
db: db,
4546
}
4647
}
4748

@@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
5152
return containers.Container{}, err
5253
}
5354

54-
bkt := getContainerBucket(s.tx, namespace, id)
55-
if bkt == nil {
56-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
57-
}
58-
5955
container := containers.Container{ID: id}
60-
if err := readContainer(&container, bkt); err != nil {
61-
return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id)
56+
57+
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
58+
bkt := getContainerBucket(tx, namespace, id)
59+
if bkt == nil {
60+
return errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
61+
}
62+
63+
if err := readContainer(&container, bkt); err != nil {
64+
return errors.Wrapf(err, "failed to read container %q", id)
65+
}
66+
67+
return nil
68+
}); err != nil {
69+
return containers.Container{}, err
6270
}
6371

6472
return container, nil
@@ -75,27 +83,30 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
7583
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
7684
}
7785

78-
bkt := getContainersBucket(s.tx, namespace)
79-
if bkt == nil {
80-
return nil, nil // empty store
81-
}
82-
8386
var m []containers.Container
84-
if err := bkt.ForEach(func(k, v []byte) error {
85-
cbkt := bkt.Bucket(k)
86-
if cbkt == nil {
87-
return nil
88-
}
89-
container := containers.Container{ID: string(k)}
9087

91-
if err := readContainer(&container, cbkt); err != nil {
92-
return errors.Wrapf(err, "failed to read container %q", string(k))
88+
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
89+
bkt := getContainersBucket(tx, namespace)
90+
if bkt == nil {
91+
return nil // empty store
9392
}
9493

95-
if filter.Match(adaptContainer(container)) {
96-
m = append(m, container)
97-
}
98-
return nil
94+
return bkt.ForEach(func(k, v []byte) error {
95+
cbkt := bkt.Bucket(k)
96+
if cbkt == nil {
97+
return nil
98+
}
99+
container := containers.Container{ID: string(k)}
100+
101+
if err := readContainer(&container, cbkt); err != nil {
102+
return errors.Wrapf(err, "failed to read container %q", string(k))
103+
}
104+
105+
if filter.Match(adaptContainer(container)) {
106+
m = append(m, container)
107+
}
108+
return nil
109+
})
99110
}); err != nil {
100111
return nil, err
101112
}
@@ -113,23 +124,29 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
113124
return containers.Container{}, errors.Wrap(err, "create container failed validation")
114125
}
115126

116-
bkt, err := createContainersBucket(s.tx, namespace)
117-
if err != nil {
118-
return containers.Container{}, err
119-
}
127+
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
128+
bkt, err := createContainersBucket(tx, namespace)
129+
if err != nil {
130+
return err
131+
}
120132

121-
cbkt, err := bkt.CreateBucket([]byte(container.ID))
122-
if err != nil {
123-
if err == bolt.ErrBucketExists {
124-
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
133+
cbkt, err := bkt.CreateBucket([]byte(container.ID))
134+
if err != nil {
135+
if err == bolt.ErrBucketExists {
136+
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
137+
}
138+
return err
125139
}
126-
return containers.Container{}, err
127-
}
128140

129-
container.CreatedAt = time.Now().UTC()
130-
container.UpdatedAt = container.CreatedAt
131-
if err := writeContainer(cbkt, &container); err != nil {
132-
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
141+
container.CreatedAt = time.Now().UTC()
142+
container.UpdatedAt = container.CreatedAt
143+
if err := writeContainer(cbkt, &container); err != nil {
144+
return errors.Wrapf(err, "failed to write container %q", container.ID)
145+
}
146+
147+
return nil
148+
}); err != nil {
149+
return containers.Container{}, err
133150
}
134151

135152
return container, nil
@@ -145,85 +162,91 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
145162
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id")
146163
}
147164

148-
bkt := getContainersBucket(s.tx, namespace)
149-
if bkt == nil {
150-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
151-
}
152-
153-
cbkt := bkt.Bucket([]byte(container.ID))
154-
if cbkt == nil {
155-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
156-
}
157-
158165
var updated containers.Container
159-
if err := readContainer(&updated, cbkt); err != nil {
160-
return updated, errors.Wrapf(err, "failed to read container %q", container.ID)
161-
}
162-
createdat := updated.CreatedAt
163-
updated.ID = container.ID
164-
165-
if len(fieldpaths) == 0 {
166-
// only allow updates to these field on full replace.
167-
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
168-
169-
// Fields that are immutable must cause an error when no field paths
170-
// are provided. This allows these fields to become mutable in the
171-
// future.
172-
if updated.Snapshotter != container.Snapshotter {
173-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
166+
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
167+
bkt := getContainersBucket(tx, namespace)
168+
if bkt == nil {
169+
return errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
174170
}
175171

176-
if updated.Runtime.Name != container.Runtime.Name {
177-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
172+
cbkt := bkt.Bucket([]byte(container.ID))
173+
if cbkt == nil {
174+
return errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
178175
}
179-
}
180176

181-
// apply the field mask. If you update this code, you better follow the
182-
// field mask rules in field_mask.proto. If you don't know what this
183-
// is, do not update this code.
184-
for _, path := range fieldpaths {
185-
if strings.HasPrefix(path, "labels.") {
186-
if updated.Labels == nil {
187-
updated.Labels = map[string]string{}
177+
if err := readContainer(&updated, cbkt); err != nil {
178+
return errors.Wrapf(err, "failed to read container %q", container.ID)
179+
}
180+
createdat := updated.CreatedAt
181+
updated.ID = container.ID
182+
183+
if len(fieldpaths) == 0 {
184+
// only allow updates to these field on full replace.
185+
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
186+
187+
// Fields that are immutable must cause an error when no field paths
188+
// are provided. This allows these fields to become mutable in the
189+
// future.
190+
if updated.Snapshotter != container.Snapshotter {
191+
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
192+
}
193+
194+
if updated.Runtime.Name != container.Runtime.Name {
195+
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
188196
}
189-
key := strings.TrimPrefix(path, "labels.")
190-
updated.Labels[key] = container.Labels[key]
191-
continue
192197
}
193198

194-
if strings.HasPrefix(path, "extensions.") {
195-
if updated.Extensions == nil {
196-
updated.Extensions = map[string]types.Any{}
199+
// apply the field mask. If you update this code, you better follow the
200+
// field mask rules in field_mask.proto. If you don't know what this
201+
// is, do not update this code.
202+
for _, path := range fieldpaths {
203+
if strings.HasPrefix(path, "labels.") {
204+
if updated.Labels == nil {
205+
updated.Labels = map[string]string{}
206+
}
207+
key := strings.TrimPrefix(path, "labels.")
208+
updated.Labels[key] = container.Labels[key]
209+
continue
210+
}
211+
212+
if strings.HasPrefix(path, "extensions.") {
213+
if updated.Extensions == nil {
214+
updated.Extensions = map[string]types.Any{}
215+
}
216+
key := strings.TrimPrefix(path, "extensions.")
217+
updated.Extensions[key] = container.Extensions[key]
218+
continue
219+
}
220+
221+
switch path {
222+
case "labels":
223+
updated.Labels = container.Labels
224+
case "spec":
225+
updated.Spec = container.Spec
226+
case "extensions":
227+
updated.Extensions = container.Extensions
228+
case "image":
229+
updated.Image = container.Image
230+
case "snapshotkey":
231+
updated.SnapshotKey = container.SnapshotKey
232+
default:
233+
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
197234
}
198-
key := strings.TrimPrefix(path, "extensions.")
199-
updated.Extensions[key] = container.Extensions[key]
200-
continue
201235
}
202236

203-
switch path {
204-
case "labels":
205-
updated.Labels = container.Labels
206-
case "spec":
207-
updated.Spec = container.Spec
208-
case "extensions":
209-
updated.Extensions = container.Extensions
210-
case "image":
211-
updated.Image = container.Image
212-
case "snapshotkey":
213-
updated.SnapshotKey = container.SnapshotKey
214-
default:
215-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
237+
if err := validateContainer(&updated); err != nil {
238+
return errors.Wrap(err, "update failed validation")
216239
}
217-
}
218240

219-
if err := validateContainer(&updated); err != nil {
220-
return containers.Container{}, errors.Wrap(err, "update failed validation")
221-
}
241+
updated.CreatedAt = createdat
242+
updated.UpdatedAt = time.Now().UTC()
243+
if err := writeContainer(cbkt, &updated); err != nil {
244+
return errors.Wrapf(err, "failed to write container %q", container.ID)
245+
}
222246

223-
updated.CreatedAt = createdat
224-
updated.UpdatedAt = time.Now().UTC()
225-
if err := writeContainer(cbkt, &updated); err != nil {
226-
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
247+
return nil
248+
}); err != nil {
249+
return containers.Container{}, err
227250
}
228251

229252
return updated, nil
@@ -235,15 +258,23 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
235258
return err
236259
}
237260

238-
bkt := getContainersBucket(s.tx, namespace)
239-
if bkt == nil {
240-
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
241-
}
261+
return update(ctx, s.db, func(tx *bolt.Tx) error {
262+
bkt := getContainersBucket(tx, namespace)
263+
if bkt == nil {
264+
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
265+
}
242266

243-
if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound {
244-
return errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
245-
}
246-
return err
267+
if err := bkt.DeleteBucket([]byte(id)); err != nil {
268+
if err == bolt.ErrBucketNotFound {
269+
err = errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
270+
}
271+
return err
272+
}
273+
274+
atomic.AddUint32(&s.db.dirty, 1)
275+
276+
return nil
277+
})
247278
}
248279

249280
func validateContainer(container *containers.Container) error {

0 commit comments

Comments
 (0)