Skip to content

Commit 9e50861

Browse files
dmcgowancrosbymichael
authored andcommitted
metadata: add content lease on existing content
When a writer is requested for an object that already exists, add that object to the provided any lease to prevent other operations from affecting the current lease's use of that content. Signed-off-by: Derek McGowan <[email protected]>
1 parent edc72bb commit 9e50861

3 files changed

Lines changed: 134 additions & 2 deletions

File tree

metadata/content.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
318318
cs.l.RLock()
319319
defer cs.l.RUnlock()
320320

321-
var w content.Writer
321+
var (
322+
w content.Writer
323+
exists bool
324+
)
322325
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
323326
if expected != "" {
324327
cbkt := getBlobBucket(tx, ns, expected)
325328
if cbkt != nil {
326-
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
329+
// Add content to lease to prevent other reference removals
330+
// from effecting this object during a provided lease
331+
if err := addContentLease(ctx, tx, expected); err != nil {
332+
return errors.Wrap(err, "unable to lease content")
333+
}
334+
// Return error outside of transaction to ensure
335+
// commit succeeds with the lease.
336+
exists = true
337+
return nil
327338
}
328339
}
329340

@@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
363374
}); err != nil {
364375
return nil, err
365376
}
377+
if exists {
378+
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
379+
}
366380

367381
// TODO: keep the expected in the writer to use on commit
368382
// when no expected is provided there.

metadata/content_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package metadata
22

33
import (
4+
"bytes"
45
"context"
56
"path/filepath"
67
"testing"
@@ -9,6 +10,11 @@ import (
910
"github.com/containerd/containerd/content"
1011
"github.com/containerd/containerd/content/local"
1112
"github.com/containerd/containerd/content/testsuite"
13+
"github.com/containerd/containerd/errdefs"
14+
"github.com/containerd/containerd/leases"
15+
"github.com/containerd/containerd/namespaces"
16+
digest "github.com/opencontainers/go-digest"
17+
"github.com/pkg/errors"
1218
)
1319

1420
func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) {
@@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
3137
func TestContent(t *testing.T) {
3238
testsuite.ContentSuite(t, "metadata", createContentStore)
3339
}
40+
41+
func TestContentLeased(t *testing.T) {
42+
ctx, db, cancel := testDB(t)
43+
defer cancel()
44+
45+
cs := db.ContentStore()
46+
47+
blob := []byte("any content")
48+
expected := digest.FromBytes(blob)
49+
50+
lctx, _, err := createLease(ctx, db, "lease-1")
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil {
55+
t.Fatal(err)
56+
}
57+
if err := checkContentLeased(lctx, db, expected); err != nil {
58+
t.Fatal("lease checked failed:", err)
59+
}
60+
61+
lctx, _, err = createLease(ctx, db, "lease-2")
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil {
67+
t.Fatal("expected already exist error")
68+
} else if !errdefs.IsAlreadyExists(err) {
69+
t.Fatal(err)
70+
}
71+
if err := checkContentLeased(lctx, db, expected); err != nil {
72+
t.Fatal("lease checked failed:", err)
73+
}
74+
}
75+
76+
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
77+
if err := db.Update(func(tx *bolt.Tx) error {
78+
_, err := NewLeaseManager(tx).Create(ctx, name, nil)
79+
return err
80+
}); err != nil {
81+
return nil, nil, err
82+
}
83+
return leases.WithLease(ctx, name), func() error {
84+
return db.Update(func(tx *bolt.Tx) error {
85+
return NewLeaseManager(tx).Delete(ctx, name)
86+
})
87+
}, nil
88+
}
89+
90+
func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
91+
ns, ok := namespaces.Namespace(ctx)
92+
if !ok {
93+
return errors.New("no namespace in context")
94+
}
95+
lease, ok := leases.Lease(ctx)
96+
if !ok {
97+
return errors.New("no lease in context")
98+
}
99+
100+
return db.View(func(tx *bolt.Tx) error {
101+
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent)
102+
if bkt == nil {
103+
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease)
104+
}
105+
v := bkt.Get([]byte(dgst.String()))
106+
if v == nil {
107+
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
108+
}
109+
110+
return nil
111+
})
112+
}

metadata/db_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"path/filepath"
1212
"runtime/pprof"
13+
"strings"
1314
"testing"
1415
"time"
1516

@@ -29,6 +30,44 @@ import (
2930
"github.com/pkg/errors"
3031
)
3132

33+
func testDB(t *testing.T) (context.Context, *DB, func()) {
34+
ctx, cancel := context.WithCancel(context.Background())
35+
ctx = namespaces.WithNamespace(ctx, "testing")
36+
37+
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive"))
43+
if err != nil {
44+
t.Fatal(err)
45+
}
46+
47+
cs, err := local.NewStore(filepath.Join(dirname, "content"))
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil)
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
57+
db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter})
58+
if err := db.Init(ctx); err != nil {
59+
t.Fatal(err)
60+
}
61+
62+
return ctx, db, func() {
63+
bdb.Close()
64+
if err := os.RemoveAll(dirname); err != nil {
65+
t.Log("failed removing temp dir", err)
66+
}
67+
cancel()
68+
}
69+
}
70+
3271
func TestInit(t *testing.T) {
3372
ctx, db, cancel := testEnv(t)
3473
defer cancel()

0 commit comments

Comments
 (0)