Skip to content

Commit 1e84803

Browse files
jedevcthaJeztah
authored andcommitted
pushWriter: correctly propagate errors
In the refactor from 926b9c7, the error handling was substantially reworked, and changed the types of errors returned. Notably, in the case of a network error, instead of propogating the error through to return from pushWriter.Write (as previously), it would be propagated through to pushWriter.Commit - however, this is too late, since we've already closed the io.Pipe by the time we would have reached this function. Therefore, we get the generic error message "io: read/write on closed pipe" for *every network error*. This patch corrects this behavior to ensure that the correct error object is always returned as early as possible, by checking the error result after writing and detecting a closed pipe. Additionally, we do some additional hardening - specifically we prevent falling through when resetting the content or detecting errors, and update the tests to explicitly check for the ErrReset message. Signed-off-by: Justin Chadwell <[email protected]> (cherry picked from commit 9f6058d) Signed-off-by: Sebastiaan van Stijn <[email protected]>
1 parent ca2e7ad commit 1e84803

2 files changed

Lines changed: 35 additions & 38 deletions

File tree

remotes/docker/pusher.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,24 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
377377

378378
// If content has already been written, the bytes
379379
// cannot be written and the caller must reset
380-
if status.Offset > 0 {
381-
status.Offset = 0
382-
status.UpdatedAt = time.Now()
383-
pw.tracker.SetStatus(pw.ref, status)
384-
return 0, content.ErrReset
385-
}
380+
status.Offset = 0
381+
status.UpdatedAt = time.Now()
382+
pw.tracker.SetStatus(pw.ref, status)
383+
return 0, content.ErrReset
386384
default:
387385
}
388386
}
389387

390388
n, err = pw.pipe.Write(p)
389+
if errors.Is(err, io.ErrClosedPipe) {
390+
// if the pipe is closed, we might have the original error on the error
391+
// channel - so we should try and get it
392+
select {
393+
case err2 := <-pw.errC:
394+
err = err2
395+
default:
396+
}
397+
}
391398
status.Offset += int64(n)
392399
status.UpdatedAt = time.Now()
393400
pw.tracker.SetStatus(pw.ref, status)
@@ -428,7 +435,7 @@ func (pw *pushWriter) Digest() digest.Digest {
428435

429436
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
430437
// Check whether read has already thrown an error
431-
if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe {
438+
if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) {
432439
return errors.Wrap(err, "pipe error before commit")
433440
}
434441

@@ -439,9 +446,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
439446
var resp *http.Response
440447
select {
441448
case err := <-pw.errC:
442-
if err != nil {
443-
return err
444-
}
449+
return err
445450
case resp = <-pw.respC:
446451
defer resp.Body.Close()
447452
case p, ok := <-pw.pipeC:
@@ -453,18 +458,17 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
453458
}
454459
pw.pipe.CloseWithError(content.ErrReset)
455460
pw.pipe = p
461+
462+
// If content has already been written, the bytes
463+
// cannot be written again and the caller must reset
456464
status, err := pw.tracker.GetStatus(pw.ref)
457465
if err != nil {
458466
return err
459467
}
460-
// If content has already been written, the bytes
461-
// cannot be written again and the caller must reset
462-
if status.Offset > 0 {
463-
status.Offset = 0
464-
status.UpdatedAt = time.Now()
465-
pw.tracker.SetStatus(pw.ref, status)
466-
return content.ErrReset
467-
}
468+
status.Offset = 0
469+
status.UpdatedAt = time.Now()
470+
pw.tracker.SetStatus(pw.ref, status)
471+
return content.ErrReset
468472
}
469473

470474
// 201 is specified return status, some registries return

remotes/docker/pusher_test.go

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,26 +117,19 @@ func TestPusherErrReset(t *testing.T) {
117117
}
118118

119119
w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false)
120-
assert.Equal(t, err, nil, "no error should be there")
121-
122-
w.Write(ct)
123-
124-
pw, _ := w.(*pushWriter)
125-
126-
select {
127-
case p := <-pw.pipeC:
128-
p.Write(ct)
129-
case e := <-pw.errC:
130-
assert.Failf(t, "error: %v while retrying request", e.Error())
131-
}
132-
133-
select {
134-
case resp := <-pw.respC:
135-
assert.Equalf(t, resp.StatusCode, http.StatusCreated,
136-
"201 should be the response code when uploading new content")
137-
case <-pw.errC:
138-
assert.Fail(t, "should not give error")
139-
}
120+
assert.NoError(t, err)
121+
122+
// first push should fail with ErrReset
123+
_, err = w.Write(ct)
124+
assert.NoError(t, err)
125+
err = w.Commit(context.Background(), desc.Size, desc.Digest)
126+
assert.Equal(t, content.ErrReset, err)
127+
128+
// second push should succeed
129+
_, err = w.Write(ct)
130+
assert.NoError(t, err)
131+
err = w.Commit(context.Background(), desc.Size, desc.Digest)
132+
assert.NoError(t, err)
140133
}
141134

142135
func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {

0 commit comments

Comments
 (0)