Skip to content

Commit 9c10bf8

Browse files
authored
Merge pull request #3668 from dmcgowan/fix-metadata-dirty
Update metadata interfaces for containers and leases
2 parents 94c4979 + 0b224ac commit 9c10bf8

15 files changed

Lines changed: 518 additions & 585 deletions

File tree

metadata/containers.go

Lines changed: 145 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package metadata
1919
import (
2020
"context"
2121
"strings"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/containerd/containerd/containers"
@@ -35,13 +36,13 @@ import (
3536
)
3637

3738
type containerStore struct {
38-
tx *bolt.Tx
39+
db *DB
3940
}
4041

4142
// NewContainerStore returns a Store backed by an underlying bolt DB
42-
func NewContainerStore(tx *bolt.Tx) containers.Store {
43+
func NewContainerStore(db *DB) containers.Store {
4344
return &containerStore{
44-
tx: tx,
45+
db: db,
4546
}
4647
}
4748

@@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
5152
return containers.Container{}, err
5253
}
5354

54-
bkt := getContainerBucket(s.tx, namespace, id)
55-
if bkt == nil {
56-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
57-
}
58-
5955
container := containers.Container{ID: id}
60-
if err := readContainer(&container, bkt); err != nil {
61-
return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id)
56+
57+
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
58+
bkt := getContainerBucket(tx, namespace, id)
59+
if bkt == nil {
60+
return errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
61+
}
62+
63+
if err := readContainer(&container, bkt); err != nil {
64+
return errors.Wrapf(err, "failed to read container %q", id)
65+
}
66+
67+
return nil
68+
}); err != nil {
69+
return containers.Container{}, err
6270
}
6371

6472
return container, nil
@@ -75,27 +83,30 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
7583
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
7684
}
7785

78-
bkt := getContainersBucket(s.tx, namespace)
79-
if bkt == nil {
80-
return nil, nil // empty store
81-
}
82-
8386
var m []containers.Container
84-
if err := bkt.ForEach(func(k, v []byte) error {
85-
cbkt := bkt.Bucket(k)
86-
if cbkt == nil {
87-
return nil
88-
}
89-
container := containers.Container{ID: string(k)}
9087

91-
if err := readContainer(&container, cbkt); err != nil {
92-
return errors.Wrapf(err, "failed to read container %q", string(k))
88+
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
89+
bkt := getContainersBucket(tx, namespace)
90+
if bkt == nil {
91+
return nil // empty store
9392
}
9493

95-
if filter.Match(adaptContainer(container)) {
96-
m = append(m, container)
97-
}
98-
return nil
94+
return bkt.ForEach(func(k, v []byte) error {
95+
cbkt := bkt.Bucket(k)
96+
if cbkt == nil {
97+
return nil
98+
}
99+
container := containers.Container{ID: string(k)}
100+
101+
if err := readContainer(&container, cbkt); err != nil {
102+
return errors.Wrapf(err, "failed to read container %q", string(k))
103+
}
104+
105+
if filter.Match(adaptContainer(container)) {
106+
m = append(m, container)
107+
}
108+
return nil
109+
})
99110
}); err != nil {
100111
return nil, err
101112
}
@@ -113,23 +124,29 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
113124
return containers.Container{}, errors.Wrap(err, "create container failed validation")
114125
}
115126

116-
bkt, err := createContainersBucket(s.tx, namespace)
117-
if err != nil {
118-
return containers.Container{}, err
119-
}
127+
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
128+
bkt, err := createContainersBucket(tx, namespace)
129+
if err != nil {
130+
return err
131+
}
120132

121-
cbkt, err := bkt.CreateBucket([]byte(container.ID))
122-
if err != nil {
123-
if err == bolt.ErrBucketExists {
124-
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
133+
cbkt, err := bkt.CreateBucket([]byte(container.ID))
134+
if err != nil {
135+
if err == bolt.ErrBucketExists {
136+
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
137+
}
138+
return err
125139
}
126-
return containers.Container{}, err
127-
}
128140

129-
container.CreatedAt = time.Now().UTC()
130-
container.UpdatedAt = container.CreatedAt
131-
if err := writeContainer(cbkt, &container); err != nil {
132-
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
141+
container.CreatedAt = time.Now().UTC()
142+
container.UpdatedAt = container.CreatedAt
143+
if err := writeContainer(cbkt, &container); err != nil {
144+
return errors.Wrapf(err, "failed to write container %q", container.ID)
145+
}
146+
147+
return nil
148+
}); err != nil {
149+
return containers.Container{}, err
133150
}
134151

135152
return container, nil
@@ -145,85 +162,91 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
145162
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id")
146163
}
147164

148-
bkt := getContainersBucket(s.tx, namespace)
149-
if bkt == nil {
150-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
151-
}
152-
153-
cbkt := bkt.Bucket([]byte(container.ID))
154-
if cbkt == nil {
155-
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
156-
}
157-
158165
var updated containers.Container
159-
if err := readContainer(&updated, cbkt); err != nil {
160-
return updated, errors.Wrapf(err, "failed to read container %q", container.ID)
161-
}
162-
createdat := updated.CreatedAt
163-
updated.ID = container.ID
164-
165-
if len(fieldpaths) == 0 {
166-
// only allow updates to these field on full replace.
167-
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
168-
169-
// Fields that are immutable must cause an error when no field paths
170-
// are provided. This allows these fields to become mutable in the
171-
// future.
172-
if updated.Snapshotter != container.Snapshotter {
173-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
166+
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
167+
bkt := getContainersBucket(tx, namespace)
168+
if bkt == nil {
169+
return errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
174170
}
175171

176-
if updated.Runtime.Name != container.Runtime.Name {
177-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
172+
cbkt := bkt.Bucket([]byte(container.ID))
173+
if cbkt == nil {
174+
return errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
178175
}
179-
}
180176

181-
// apply the field mask. If you update this code, you better follow the
182-
// field mask rules in field_mask.proto. If you don't know what this
183-
// is, do not update this code.
184-
for _, path := range fieldpaths {
185-
if strings.HasPrefix(path, "labels.") {
186-
if updated.Labels == nil {
187-
updated.Labels = map[string]string{}
177+
if err := readContainer(&updated, cbkt); err != nil {
178+
return errors.Wrapf(err, "failed to read container %q", container.ID)
179+
}
180+
createdat := updated.CreatedAt
181+
updated.ID = container.ID
182+
183+
if len(fieldpaths) == 0 {
184+
// only allow updates to these field on full replace.
185+
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
186+
187+
// Fields that are immutable must cause an error when no field paths
188+
// are provided. This allows these fields to become mutable in the
189+
// future.
190+
if updated.Snapshotter != container.Snapshotter {
191+
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
192+
}
193+
194+
if updated.Runtime.Name != container.Runtime.Name {
195+
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
188196
}
189-
key := strings.TrimPrefix(path, "labels.")
190-
updated.Labels[key] = container.Labels[key]
191-
continue
192197
}
193198

194-
if strings.HasPrefix(path, "extensions.") {
195-
if updated.Extensions == nil {
196-
updated.Extensions = map[string]types.Any{}
199+
// apply the field mask. If you update this code, you better follow the
200+
// field mask rules in field_mask.proto. If you don't know what this
201+
// is, do not update this code.
202+
for _, path := range fieldpaths {
203+
if strings.HasPrefix(path, "labels.") {
204+
if updated.Labels == nil {
205+
updated.Labels = map[string]string{}
206+
}
207+
key := strings.TrimPrefix(path, "labels.")
208+
updated.Labels[key] = container.Labels[key]
209+
continue
210+
}
211+
212+
if strings.HasPrefix(path, "extensions.") {
213+
if updated.Extensions == nil {
214+
updated.Extensions = map[string]types.Any{}
215+
}
216+
key := strings.TrimPrefix(path, "extensions.")
217+
updated.Extensions[key] = container.Extensions[key]
218+
continue
219+
}
220+
221+
switch path {
222+
case "labels":
223+
updated.Labels = container.Labels
224+
case "spec":
225+
updated.Spec = container.Spec
226+
case "extensions":
227+
updated.Extensions = container.Extensions
228+
case "image":
229+
updated.Image = container.Image
230+
case "snapshotkey":
231+
updated.SnapshotKey = container.SnapshotKey
232+
default:
233+
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
197234
}
198-
key := strings.TrimPrefix(path, "extensions.")
199-
updated.Extensions[key] = container.Extensions[key]
200-
continue
201235
}
202236

203-
switch path {
204-
case "labels":
205-
updated.Labels = container.Labels
206-
case "spec":
207-
updated.Spec = container.Spec
208-
case "extensions":
209-
updated.Extensions = container.Extensions
210-
case "image":
211-
updated.Image = container.Image
212-
case "snapshotkey":
213-
updated.SnapshotKey = container.SnapshotKey
214-
default:
215-
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
237+
if err := validateContainer(&updated); err != nil {
238+
return errors.Wrap(err, "update failed validation")
216239
}
217-
}
218240

219-
if err := validateContainer(&updated); err != nil {
220-
return containers.Container{}, errors.Wrap(err, "update failed validation")
221-
}
241+
updated.CreatedAt = createdat
242+
updated.UpdatedAt = time.Now().UTC()
243+
if err := writeContainer(cbkt, &updated); err != nil {
244+
return errors.Wrapf(err, "failed to write container %q", container.ID)
245+
}
222246

223-
updated.CreatedAt = createdat
224-
updated.UpdatedAt = time.Now().UTC()
225-
if err := writeContainer(cbkt, &updated); err != nil {
226-
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
247+
return nil
248+
}); err != nil {
249+
return containers.Container{}, err
227250
}
228251

229252
return updated, nil
@@ -235,15 +258,23 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
235258
return err
236259
}
237260

238-
bkt := getContainersBucket(s.tx, namespace)
239-
if bkt == nil {
240-
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
241-
}
261+
return update(ctx, s.db, func(tx *bolt.Tx) error {
262+
bkt := getContainersBucket(tx, namespace)
263+
if bkt == nil {
264+
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
265+
}
242266

243-
if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound {
244-
return errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
245-
}
246-
return err
267+
if err := bkt.DeleteBucket([]byte(id)); err != nil {
268+
if err == bolt.ErrBucketNotFound {
269+
err = errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
270+
}
271+
return err
272+
}
273+
274+
atomic.AddUint32(&s.db.dirty, 1)
275+
276+
return nil
277+
})
247278
}
248279

249280
func validateContainer(container *containers.Container) error {

0 commit comments

Comments
 (0)