Skip to content

Commit 3cb6d73

Browse files
authored
Merge pull request #1988 from crosbymichael/content-lease
[release 1.0] metadata: add content lease on existing content
2 parents edc72bb + 9e50861 commit 3cb6d73

3 files changed

Lines changed: 134 additions & 2 deletions

File tree

metadata/content.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,23 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
318318
cs.l.RLock()
319319
defer cs.l.RUnlock()
320320

321-
var w content.Writer
321+
var (
322+
w content.Writer
323+
exists bool
324+
)
322325
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
323326
if expected != "" {
324327
cbkt := getBlobBucket(tx, ns, expected)
325328
if cbkt != nil {
326-
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
329+
// Add content to lease to prevent other reference removals
330+
// from effecting this object during a provided lease
331+
if err := addContentLease(ctx, tx, expected); err != nil {
332+
return errors.Wrap(err, "unable to lease content")
333+
}
334+
// Return error outside of transaction to ensure
335+
// commit succeeds with the lease.
336+
exists = true
337+
return nil
327338
}
328339
}
329340

@@ -363,6 +374,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
363374
}); err != nil {
364375
return nil, err
365376
}
377+
if exists {
378+
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
379+
}
366380

367381
// TODO: keep the expected in the writer to use on commit
368382
// when no expected is provided there.

metadata/content_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package metadata
22

33
import (
4+
"bytes"
45
"context"
56
"path/filepath"
67
"testing"
@@ -9,6 +10,11 @@ import (
910
"github.com/containerd/containerd/content"
1011
"github.com/containerd/containerd/content/local"
1112
"github.com/containerd/containerd/content/testsuite"
13+
"github.com/containerd/containerd/errdefs"
14+
"github.com/containerd/containerd/leases"
15+
"github.com/containerd/containerd/namespaces"
16+
digest "github.com/opencontainers/go-digest"
17+
"github.com/pkg/errors"
1218
)
1319

1420
func createContentStore(ctx context.Context, root string) (content.Store, func() error, error) {
@@ -31,3 +37,76 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
3137
func TestContent(t *testing.T) {
3238
testsuite.ContentSuite(t, "metadata", createContentStore)
3339
}
40+
41+
func TestContentLeased(t *testing.T) {
42+
ctx, db, cancel := testDB(t)
43+
defer cancel()
44+
45+
cs := db.ContentStore()
46+
47+
blob := []byte("any content")
48+
expected := digest.FromBytes(blob)
49+
50+
lctx, _, err := createLease(ctx, db, "lease-1")
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil {
55+
t.Fatal(err)
56+
}
57+
if err := checkContentLeased(lctx, db, expected); err != nil {
58+
t.Fatal("lease checked failed:", err)
59+
}
60+
61+
lctx, _, err = createLease(ctx, db, "lease-2")
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil {
67+
t.Fatal("expected already exist error")
68+
} else if !errdefs.IsAlreadyExists(err) {
69+
t.Fatal(err)
70+
}
71+
if err := checkContentLeased(lctx, db, expected); err != nil {
72+
t.Fatal("lease checked failed:", err)
73+
}
74+
}
75+
76+
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
77+
if err := db.Update(func(tx *bolt.Tx) error {
78+
_, err := NewLeaseManager(tx).Create(ctx, name, nil)
79+
return err
80+
}); err != nil {
81+
return nil, nil, err
82+
}
83+
return leases.WithLease(ctx, name), func() error {
84+
return db.Update(func(tx *bolt.Tx) error {
85+
return NewLeaseManager(tx).Delete(ctx, name)
86+
})
87+
}, nil
88+
}
89+
90+
func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
91+
ns, ok := namespaces.Namespace(ctx)
92+
if !ok {
93+
return errors.New("no namespace in context")
94+
}
95+
lease, ok := leases.Lease(ctx)
96+
if !ok {
97+
return errors.New("no lease in context")
98+
}
99+
100+
return db.View(func(tx *bolt.Tx) error {
101+
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectContent)
102+
if bkt == nil {
103+
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found", lease)
104+
}
105+
v := bkt.Get([]byte(dgst.String()))
106+
if v == nil {
107+
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
108+
}
109+
110+
return nil
111+
})
112+
}

metadata/db_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"path/filepath"
1212
"runtime/pprof"
13+
"strings"
1314
"testing"
1415
"time"
1516

@@ -29,6 +30,44 @@ import (
2930
"github.com/pkg/errors"
3031
)
3132

33+
func testDB(t *testing.T) (context.Context, *DB, func()) {
34+
ctx, cancel := context.WithCancel(context.Background())
35+
ctx = namespaces.WithNamespace(ctx, "testing")
36+
37+
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
snapshotter, err := naive.NewSnapshotter(filepath.Join(dirname, "naive"))
43+
if err != nil {
44+
t.Fatal(err)
45+
}
46+
47+
cs, err := local.NewStore(filepath.Join(dirname, "content"))
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
bdb, err := bolt.Open(filepath.Join(dirname, "metadata.db"), 0644, nil)
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
57+
db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"naive": snapshotter})
58+
if err := db.Init(ctx); err != nil {
59+
t.Fatal(err)
60+
}
61+
62+
return ctx, db, func() {
63+
bdb.Close()
64+
if err := os.RemoveAll(dirname); err != nil {
65+
t.Log("failed removing temp dir", err)
66+
}
67+
cancel()
68+
}
69+
}
70+
3271
func TestInit(t *testing.T) {
3372
ctx, db, cancel := testEnv(t)
3473
defer cancel()

0 commit comments

Comments
 (0)