Skip to content

Commit 926b9c7

Browse files
committed
retry request on writer reset
when a put request is retried due to the response from registry, the body of the request should be seekable. A dynamic pipe is added to the body so that the content of the body can be read again. Currently a maximum of 5 resets are allowed, above which will fail the request. A new error ErrReset is introduced which informs that a reset has occured and request needs to be retried. also added tests for Copy() and push() to test the new functionality Signed-off-by: Akhil Mohan <[email protected]> (cherry picked from commit 8f4c23b) Signed-off-by: Akhil Mohan <[email protected]>
1 parent 7b93a22 commit 926b9c7

File tree

4 files changed

+499
-88
lines changed

4 files changed

+499
-88
lines changed

content/helpers.go

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,16 @@ import (
2626
"time"
2727

2828
"github.com/containerd/containerd/errdefs"
29+
"github.com/containerd/containerd/log"
2930
"github.com/opencontainers/go-digest"
3031
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3132
)
3233

34+
// maxResets is the no.of times the Copy() method can tolerate a reset of the body
35+
const maxResets = 5
36+
37+
var ErrReset = errors.New("writer has been reset")
38+
3339
var bufPool = sync.Pool{
3440
New: func() interface{} {
3541
buffer := make([]byte, 1<<20)
@@ -80,7 +86,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc o
8086
return fmt.Errorf("failed to open writer: %w", err)
8187
}
8288

83-
return nil // all ready present
89+
return nil // already present
8490
}
8591
defer cw.Close()
8692

@@ -131,35 +137,63 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
131137
// the size or digest is unknown, these values may be empty.
132138
//
133139
// Copy is buffered, so no need to wrap reader in buffered io.
134-
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
140+
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
135141
ws, err := cw.Status()
136142
if err != nil {
137143
return fmt.Errorf("failed to get status: %w", err)
138144
}
139-
145+
r := or
140146
if ws.Offset > 0 {
141-
r, err = seekReader(r, ws.Offset, size)
147+
r, err = seekReader(or, ws.Offset, size)
142148
if err != nil {
143149
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
144150
}
145151
}
146152

147-
copied, err := copyWithBuffer(cw, r)
148-
if err != nil {
149-
return fmt.Errorf("failed to copy: %w", err)
150-
}
151-
if size != 0 && copied < size-ws.Offset {
152-
// Short writes would return its own error, this indicates a read failure
153-
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
154-
}
155-
156-
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
157-
if !errdefs.IsAlreadyExists(err) {
158-
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
153+
for i := 0; i < maxResets; i++ {
154+
if i >= 1 {
155+
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
156+
}
157+
copied, err := copyWithBuffer(cw, r)
158+
if errors.Is(err, ErrReset) {
159+
ws, err := cw.Status()
160+
if err != nil {
161+
return fmt.Errorf("failed to get status: %w", err)
162+
}
163+
r, err = seekReader(or, ws.Offset, size)
164+
if err != nil {
165+
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
166+
}
167+
continue
168+
}
169+
if err != nil {
170+
return fmt.Errorf("failed to copy: %w", err)
171+
}
172+
if size != 0 && copied < size-ws.Offset {
173+
// Short writes would return its own error, this indicates a read failure
174+
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
175+
}
176+
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
177+
if errors.Is(err, ErrReset) {
178+
ws, err := cw.Status()
179+
if err != nil {
180+
return fmt.Errorf("failed to get status: %w", err)
181+
}
182+
r, err = seekReader(or, ws.Offset, size)
183+
if err != nil {
184+
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
185+
}
186+
continue
187+
}
188+
if !errdefs.IsAlreadyExists(err) {
189+
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
190+
}
159191
}
192+
return nil
160193
}
161194

162-
return nil
195+
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
196+
return fmt.Errorf("failed to copy after %d retries", maxResets)
163197
}
164198

165199
// CopyReaderAt copies to a writer from a given reader at for the given

content/helpers_test.go

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
_ "crypto/sha256" // required by go-digest
23+
"fmt"
2324
"io"
2425
"strings"
2526
"testing"
@@ -39,38 +40,107 @@ type copySource struct {
3940
func TestCopy(t *testing.T) {
4041
defaultSource := newCopySource("this is the source to copy")
4142

43+
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
44+
i := 0
45+
return func() error {
46+
// function resets the first time
47+
if i == 0 {
48+
// this is the case where, the pipewriter to which the data was being written has
49+
// changed. which means we need to clear the buffer
50+
i++
51+
buf.Reset()
52+
st.Offset = 0
53+
return ErrReset
54+
}
55+
return nil
56+
}
57+
}
58+
59+
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
60+
i := 0
61+
return func() error {
62+
// function resets for more than the maxReset value
63+
if i < maxResets+1 {
64+
// this is the case where, the pipewriter to which the data was being written has
65+
// changed. which means we need to clear the buffer
66+
i++
67+
buf.Reset()
68+
st.Offset = 0
69+
return ErrReset
70+
}
71+
return nil
72+
}
73+
}
74+
75+
s1 := Status{}
76+
s2 := Status{}
77+
b1 := bytes.Buffer{}
78+
b2 := bytes.Buffer{}
79+
4280
var testcases = []struct {
43-
name string
44-
source copySource
45-
writer fakeWriter
46-
expected string
81+
name string
82+
source copySource
83+
writer fakeWriter
84+
expected string
85+
expectedErr error
4786
}{
4887
{
49-
name: "copy no offset",
50-
source: defaultSource,
51-
writer: fakeWriter{},
88+
name: "copy no offset",
89+
source: defaultSource,
90+
writer: fakeWriter{
91+
Buffer: &bytes.Buffer{},
92+
},
5293
expected: "this is the source to copy",
5394
},
5495
{
55-
name: "copy with offset from seeker",
56-
source: defaultSource,
57-
writer: fakeWriter{status: Status{Offset: 8}},
96+
name: "copy with offset from seeker",
97+
source: defaultSource,
98+
writer: fakeWriter{
99+
Buffer: &bytes.Buffer{},
100+
status: Status{Offset: 8},
101+
},
58102
expected: "the source to copy",
59103
},
60104
{
61-
name: "copy with offset from unseekable source",
62-
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
63-
writer: fakeWriter{status: Status{Offset: 3}},
105+
name: "copy with offset from unseekable source",
106+
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
107+
writer: fakeWriter{
108+
Buffer: &bytes.Buffer{},
109+
status: Status{Offset: 3},
110+
},
64111
expected: "bar",
65112
},
66113
{
67114
name: "commit already exists",
68115
source: newCopySource("this already exists"),
69-
writer: fakeWriter{commitFunc: func() error {
70-
return errdefs.ErrAlreadyExists
71-
}},
116+
writer: fakeWriter{
117+
Buffer: &bytes.Buffer{},
118+
commitFunc: func() error {
119+
return errdefs.ErrAlreadyExists
120+
}},
72121
expected: "this already exists",
73122
},
123+
{
124+
name: "commit fails first time with ErrReset",
125+
source: newCopySource("content to copy"),
126+
writer: fakeWriter{
127+
Buffer: &b1,
128+
status: s1,
129+
commitFunc: cf1(&b1, s1),
130+
},
131+
expected: "content to copy",
132+
},
133+
{
134+
name: "write fails more than maxReset times due to reset",
135+
source: newCopySource("content to copy"),
136+
writer: fakeWriter{
137+
Buffer: &b2,
138+
status: s2,
139+
commitFunc: cf2(&b2, s2),
140+
},
141+
expected: "",
142+
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
143+
},
74144
}
75145

76146
for _, testcase := range testcases {
@@ -81,6 +151,12 @@ func TestCopy(t *testing.T) {
81151
testcase.source.size,
82152
testcase.source.digest)
83153

154+
// if an error is expected then further comparisons are not required
155+
if testcase.expectedErr != nil {
156+
assert.Check(t, is.Equal(testcase.expectedErr.Error(), err.Error()))
157+
return
158+
}
159+
84160
assert.NilError(t, err)
85161
assert.Check(t, is.Equal(testcase.source.digest, testcase.writer.committedDigest))
86162
assert.Check(t, is.Equal(testcase.expected, testcase.writer.String()))
@@ -96,11 +172,13 @@ func newCopySource(raw string) copySource {
96172
}
97173
}
98174

175+
type commitFunction func() error
176+
99177
type fakeWriter struct {
100-
bytes.Buffer
178+
*bytes.Buffer
101179
committedDigest digest.Digest
102180
status Status
103-
commitFunc func() error
181+
commitFunc commitFunction
104182
}
105183

106184
func (f *fakeWriter) Close() error {

0 commit comments

Comments
 (0)