Skip to content

Commit ab1654d

Browse files
committed
Fix PushHandler cannot push image that contains duplicated blobs
Signed-off-by: Kohei Tokunaga <[email protected]>
1 parent f0890f9 commit ab1654d

11 files changed

Lines changed: 445 additions & 5 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ require (
3535
github.com/hashicorp/go-multierror v1.0.0
3636
github.com/imdario/mergo v0.3.11
3737
github.com/klauspost/compress v1.11.13
38+
github.com/moby/locker v1.0.1
3839
github.com/moby/sys/mountinfo v0.4.1
3940
github.com/moby/sys/symlink v0.1.0
4041
github.com/opencontainers/go-digest v1.0.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,8 @@ github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible h1
393393
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
394394
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
395395
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
396+
github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
397+
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
396398
github.com/moby/sys/mountinfo v0.4.0/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A=
397399
github.com/moby/sys/mountinfo v0.4.1 h1:1O+1cHA1aujwEwwVMa2Xm2l+gIpUHyd3+D+d7LZh1kM=
398400
github.com/moby/sys/mountinfo v0.4.1/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A=

remotes/docker/pusher.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,47 @@ type dockerPusher struct {
4444
tracker StatusTracker
4545
}
4646

47+
// Writer implements Ingester API of content store. This allows the client
48+
// to receive ErrUnavailable when there is already an on-going upload.
49+
// Note that the tracker MUST implement StatusTrackLocker interface to avoid
50+
// race condition on StatusTracker.
51+
func (p dockerPusher) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
52+
var wOpts content.WriterOpts
53+
for _, opt := range opts {
54+
if err := opt(&wOpts); err != nil {
55+
return nil, err
56+
}
57+
}
58+
if wOpts.Ref == "" {
59+
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
60+
}
61+
return p.push(ctx, wOpts.Desc, wOpts.Ref, true)
62+
}
63+
4764
func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
65+
return p.push(ctx, desc, remotes.MakeRefKey(ctx, desc), false)
66+
}
67+
68+
func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref string, unavailableOnFail bool) (content.Writer, error) {
69+
if l, ok := p.tracker.(StatusTrackLocker); ok {
70+
l.Lock(ref)
71+
defer l.Unlock(ref)
72+
}
4873
ctx, err := ContextWithRepositoryScope(ctx, p.refspec, true)
4974
if err != nil {
5075
return nil, err
5176
}
52-
ref := remotes.MakeRefKey(ctx, desc)
5377
status, err := p.tracker.GetStatus(ref)
5478
if err == nil {
5579
if status.Committed && status.Offset == status.Total {
5680
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref)
5781
}
82+
if unavailableOnFail {
83+
// Another push of this ref is happening elsewhere. The rest of function
84+
// will continue only when `errdefs.IsNotFound(err) == true` (i.e. there
85+
// is no actively-tracked ref already).
86+
return nil, errors.Wrap(errdefs.ErrUnavailable, "push is on-going")
87+
}
5888
// TODO: Handle incomplete status
5989
} else if !errdefs.IsNotFound(err) {
6090
return nil, errors.Wrap(err, "failed to get status")
@@ -105,8 +135,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
105135

106136
if exists {
107137
p.tracker.SetStatus(ref, Status{
138+
Committed: true,
108139
Status: content.Status{
109-
Ref: ref,
140+
Ref: ref,
141+
Total: desc.Size,
142+
Offset: desc.Size,
110143
// TODO: Set updated time?
111144
},
112145
})
@@ -162,8 +195,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
162195
case http.StatusOK, http.StatusAccepted, http.StatusNoContent:
163196
case http.StatusCreated:
164197
p.tracker.SetStatus(ref, Status{
198+
Committed: true,
165199
Status: content.Status{
166-
Ref: ref,
200+
Ref: ref,
201+
Total: desc.Size,
202+
Offset: desc.Size,
167203
},
168204
})
169205
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", desc.Digest)

remotes/docker/status.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/containerd/containerd/content"
2323
"github.com/containerd/containerd/errdefs"
24+
"github.com/moby/locker"
2425
"github.com/pkg/errors"
2526
)
2627

@@ -40,15 +41,24 @@ type StatusTracker interface {
4041
SetStatus(string, Status)
4142
}
4243

44+
// StatusTrackLocker to track status of operations with lock
45+
type StatusTrackLocker interface {
46+
StatusTracker
47+
Lock(string)
48+
Unlock(string)
49+
}
50+
4351
type memoryStatusTracker struct {
4452
statuses map[string]Status
4553
m sync.Mutex
54+
locker *locker.Locker
4655
}
4756

4857
// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory
49-
func NewInMemoryTracker() StatusTracker {
58+
func NewInMemoryTracker() StatusTrackLocker {
5059
return &memoryStatusTracker{
5160
statuses: map[string]Status{},
61+
locker: locker.New(),
5262
}
5363
}
5464

@@ -67,3 +77,11 @@ func (t *memoryStatusTracker) SetStatus(ref string, status Status) {
6777
t.statuses[ref] = status
6878
t.m.Unlock()
6979
}
80+
81+
func (t *memoryStatusTracker) Lock(ref string) {
82+
t.locker.Lock(ref)
83+
}
84+
85+
func (t *memoryStatusTracker) Unlock(ref string) {
86+
t.locker.Unlock(ref)
87+
}

remotes/handlers.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,15 @@ func PushHandler(pusher Pusher, provider content.Provider) images.HandlerFunc {
165165
func push(ctx context.Context, provider content.Provider, pusher Pusher, desc ocispec.Descriptor) error {
166166
log.G(ctx).Debug("push")
167167

168-
cw, err := pusher.Push(ctx, desc)
168+
var (
169+
cw content.Writer
170+
err error
171+
)
172+
if cs, ok := pusher.(content.Ingester); ok {
173+
cw, err = content.OpenWriter(ctx, cs, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc))
174+
} else {
175+
cw, err = pusher.Push(ctx, desc)
176+
}
169177
if err != nil {
170178
if !errdefs.IsAlreadyExists(err) {
171179
return err

remotes/resolver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type Resolver interface {
4545
Fetcher(ctx context.Context, ref string) (Fetcher, error)
4646

4747
// Pusher returns a new pusher for the provided reference
48+
// The returned Pusher should satisfy content.Ingester and concurrent attempts
49+
// to push the same blob using the Ingester API should result in ErrUnavailable.
4850
Pusher(ctx context.Context, ref string) (Pusher, error)
4951
}
5052

vendor/github.com/moby/locker/LICENSE

Lines changed: 190 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)