@@ -44,17 +44,47 @@ type dockerPusher struct {
4444 tracker StatusTracker
4545}
4646
47+ // Writer implements Ingester API of content store. This allows the client
48+ // to receive ErrUnavailable when there is already an on-going upload.
49+ // Note that the tracker MUST implement StatusTrackLocker interface to avoid
50+ // race condition on StatusTracker.
51+ func (p dockerPusher ) Writer (ctx context.Context , opts ... content.WriterOpt ) (content.Writer , error ) {
52+ var wOpts content.WriterOpts
53+ for _ , opt := range opts {
54+ if err := opt (& wOpts ); err != nil {
55+ return nil , err
56+ }
57+ }
58+ if wOpts .Ref == "" {
59+ return nil , errors .Wrap (errdefs .ErrInvalidArgument , "ref must not be empty" )
60+ }
61+ return p .push (ctx , wOpts .Desc , wOpts .Ref , true )
62+ }
63+
4764func (p dockerPusher ) Push (ctx context.Context , desc ocispec.Descriptor ) (content.Writer , error ) {
65+ return p .push (ctx , desc , remotes .MakeRefKey (ctx , desc ), false )
66+ }
67+
68+ func (p dockerPusher ) push (ctx context.Context , desc ocispec.Descriptor , ref string , unavailableOnFail bool ) (content.Writer , error ) {
69+ if l , ok := p .tracker .(StatusTrackLocker ); ok {
70+ l .Lock (ref )
71+ defer l .Unlock (ref )
72+ }
4873 ctx , err := ContextWithRepositoryScope (ctx , p .refspec , true )
4974 if err != nil {
5075 return nil , err
5176 }
52- ref := remotes .MakeRefKey (ctx , desc )
5377 status , err := p .tracker .GetStatus (ref )
5478 if err == nil {
5579 if status .Committed && status .Offset == status .Total {
5680 return nil , errors .Wrapf (errdefs .ErrAlreadyExists , "ref %v" , ref )
5781 }
82+ if unavailableOnFail {
83+ // Another push of this ref is happening elsewhere. The rest of function
84+ // will continue only when `errdefs.IsNotFound(err) == true` (i.e. there
85+ // is no actively-tracked ref already).
86+ return nil , errors .Wrap (errdefs .ErrUnavailable , "push is on-going" )
87+ }
5888 // TODO: Handle incomplete status
5989 } else if ! errdefs .IsNotFound (err ) {
6090 return nil , errors .Wrap (err , "failed to get status" )
@@ -105,8 +135,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
105135
106136 if exists {
107137 p .tracker .SetStatus (ref , Status {
138+ Committed : true ,
108139 Status : content.Status {
109- Ref : ref ,
140+ Ref : ref ,
141+ Total : desc .Size ,
142+ Offset : desc .Size ,
110143 // TODO: Set updated time?
111144 },
112145 })
@@ -162,8 +195,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
162195 case http .StatusOK , http .StatusAccepted , http .StatusNoContent :
163196 case http .StatusCreated :
164197 p .tracker .SetStatus (ref , Status {
198+ Committed : true ,
165199 Status : content.Status {
166- Ref : ref ,
200+ Ref : ref ,
201+ Total : desc .Size ,
202+ Offset : desc .Size ,
167203 },
168204 })
169205 return nil , errors .Wrapf (errdefs .ErrAlreadyExists , "content %v on remote" , desc .Digest )
0 commit comments