Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) {
}
// check if childless data type has blob in content store
for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest)
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) {

// check if childless data type has blob in content store
for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest)
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/ctr/commands/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (
}
defer cancel()
cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst)
ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ var (
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit.
return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest)
return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest})
},
}

Expand Down Expand Up @@ -314,7 +314,7 @@ var (
}
defer cancel()
cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst)
ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil {
return err
}
Expand All @@ -326,7 +326,7 @@ var (
}
defer nrc.Close()

wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key?
if err != nil {
return err
}
Expand Down Expand Up @@ -482,7 +482,7 @@ var (
Size: info.Size,
}

ra, err := cs.ReaderAt(ctx, dgst)
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ctr/commands/snapshots/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ var diffCommand = cli.Command{
}
}

ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest)
ra, err := client.ContentStore().ReaderAt(ctx, desc)
if err != nil {
return err
}
Expand Down
13 changes: 6 additions & 7 deletions container_opts_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/gogo/protobuf/proto"
protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

Expand All @@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error {
var (
desc = im.Target()
id = desc.Digest
store = client.ContentStore()
)
index, err := decodeIndex(ctx, store, id)
index, err := decodeIndex(ctx, store, desc)
if err != nil {
return err
}
Expand All @@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
}
c.Image = index.Annotations["image.name"]
case images.MediaTypeContainerd1CheckpointConfig:
data, err := content.ReadBlob(ctx, store, m.Digest)
data, err := content.ReadBlob(ctx, store, m)
if err != nil {
return errors.Wrap(err, "unable to read checkpoint config")
}
Expand Down Expand Up @@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
return func(ctx context.Context, c *Client, info *TaskInfo) error {
desc := im.Target()
id := desc.Digest
index, err := decodeIndex(ctx, c.ContentStore(), id)
index, err := decodeIndex(ctx, c.ContentStore(), desc)
if err != nil {
return err
}
Expand All @@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
}
}

func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) {
func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) {
var index v1.Index
p, err := content.ReadBlob(ctx, store, id)
p, err := content.ReadBlob(ctx, store, desc)
if err != nil {
return nil, err
}
Expand Down
39 changes: 37 additions & 2 deletions content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer
Expand All @@ -33,12 +34,16 @@ type ReaderAt interface {

// Provider provides a reader interface for specific content
type Provider interface {
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error)
// ReaderAt only requires desc.Digest to be set.
// Other fields in the descriptor may be used internally for resolving
// the location of the actual data.
ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
}

// Ingester writes content
type Ingester interface {
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
// Some implementations require WithRef to be included in opts.
Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
}

// Info holds content specific information
Expand Down Expand Up @@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt {
return nil
}
}

// WriterOpts is internally used by WriterOpt.
type WriterOpts struct {
Ref string
Desc ocispec.Descriptor
}

// WriterOpt is used for passing options to Ingester.Writer.
type WriterOpt func(*WriterOpts) error

// WithDescriptor specifies an OCI descriptor.
// Writer may optionally use the descriptor internally for resolving
// the location of the actual data.
// Write does not require any field of desc to be set.
// If the data size is unknown, desc.Size should be set to 0.
// Some implementations may also accept negative values as "unknown".
func WithDescriptor(desc ocispec.Descriptor) WriterOpt {
return func(opts *WriterOpts) error {
opts.Desc = desc
return nil
}
}

// WithRef specifies a ref string.
func WithRef(ref string) WriterOpt {
return func(opts *WriterOpts) error {
opts.Ref = ref
return nil
}
}
15 changes: 8 additions & 7 deletions content/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

Expand All @@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader {
// ReadBlob retrieves the entire contents of the blob from the provider.
//
// Avoid using this for large blobs, such as layers.
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) {
ra, err := provider.ReaderAt(ctx, dgst)
func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) {
ra, err := provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
Expand All @@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt
// This is useful when the digest and size are known beforehand.
//
// Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, no oci descriptor on write.

cw, err := OpenWriter(ctx, cs, ref, size, expected)
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error {
cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc))
if err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
Expand All @@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
}
defer cw.Close()

return Copy(ctx, cw, r, size, expected, opts...)
return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
}

// OpenWriter opens a new writer for the given reference, retrying if the writer
// is locked until the reference is available or returns an error.
func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) {
func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) {
var (
cw Writer
err error
retry = 16
)
for {
cw, err = cs.Writer(ctx, ref, size, expected)
cw, err = cs.Writer(ctx, opts...)
if err != nil {
if !errdefs.IsUnavailable(err) {
return nil, err
Expand Down
28 changes: 20 additions & 8 deletions content/local/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin
}

// ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
p := s.blobPath(dgst)
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p := s.blobPath(desc.Digest)
fi, err := os.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
}

fp, err := os.Open(p)
Expand All @@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade
return nil, err
}

return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
}

return sizeReaderAt{size: fi.Size(), fp: fp}, nil
Expand Down Expand Up @@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 {
// ref at a time.
//
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
var lockErr error
for count := uint64(0); count < 10; count++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
if err := tryLock(ref); err != nil {
if err := tryLock(wOpts.Ref); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the empty ref case now? This looks dangerous with an empty ref and could easily be omitted now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to require non-empty ref

if !errdefs.IsUnavailable(err) {
return nil, err
}
Expand All @@ -420,9 +432,9 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
return nil, lockErr
}

w, err := s.writer(ctx, ref, total, expected)
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil {
unlock(ref)
unlock(wOpts.Ref)
return nil, err
}

Expand Down
24 changes: 13 additions & 11 deletions content/local/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/containerd/containerd/pkg/testutil"
"github.com/gotestyourself/gotestyourself/assert"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

type memoryLabelStore struct {
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal("ingest dir should be created", err)
}

cw, err := cs.Writer(ctx, "myref", 0, "")
cw, err := cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -117,13 +118,13 @@ func TestContentWriter(t *testing.T) {
}

// reopen, so we can test things
cw, err = cs.Writer(ctx, "myref", 0, "")
cw, err = cs.Writer(ctx, content.WithRef("myref"))
if err != nil {
t.Fatal(err)
}

// make sure that second resume also fails
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil {
if _, err = cs.Writer(ctx, content.WithRef("myref")); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well.
t.Fatal("no error on second resume")
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err)
}

cw, err = cs.Writer(ctx, "aref", 0, "")
cw, err = cs.Writer(ctx, content.WithRef("aref"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -346,7 +347,8 @@ func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
}

func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p),
ocispec.Descriptor{Size: int64(len(p)), Digest: dgst}); err != nil {
t.Fatal(err)
}

Expand All @@ -365,25 +367,25 @@ func TestWriterTruncateRecoversFromIncompleteWrite(t *testing.T) {
defer cancel()

ref := "ref"
content := []byte("this is the content")
total := int64(len(content))
contentB := []byte("this is the content")
total := int64(len(contentB))
setupIncompleteWrite(ctx, t, cs, ref, total)

writer, err := cs.Writer(ctx, ref, total, "")
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err)

assert.NilError(t, writer.Truncate(0))

_, err = writer.Write(content)
_, err = writer.Write(contentB)
assert.NilError(t, err)

dgst := digest.FromBytes(content)
dgst := digest.FromBytes(contentB)
err = writer.Commit(ctx, total, dgst)
assert.NilError(t, err)
}

func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) {
writer, err := cs.Writer(ctx, ref, total, "")
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err)

_, err = writer.Write([]byte("bad data"))
Expand Down
Loading