Skip to content

Commit 8f4c23b

Browse files
committed
retry request on writer reset
when a put request is retried due to the response from registry, the body of the request should be seekable. A dynamic pipe is added to the body so that the content of the body can be read again. Currently a maximum of 5 resets are allowed, above which will fail the request. A new error ErrReset is introduced which informs that a reset has occured and request needs to be retried. also added tests for Copy() and push() to test the new functionality Signed-off-by: Akhil Mohan <[email protected]>
1 parent de509c0 commit 8f4c23b

4 files changed

Lines changed: 499 additions & 88 deletions

File tree

content/helpers.go

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,16 @@ import (
2626
"time"
2727

2828
"github.com/containerd/containerd/errdefs"
29+
"github.com/containerd/containerd/log"
2930
"github.com/opencontainers/go-digest"
3031
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3132
)
3233

34+
// maxResets is the no.of times the Copy() method can tolerate a reset of the body
35+
const maxResets = 5
36+
37+
var ErrReset = errors.New("writer has been reset")
38+
3339
var bufPool = sync.Pool{
3440
New: func() interface{} {
3541
buffer := make([]byte, 1<<20)
@@ -80,7 +86,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc o
8086
return fmt.Errorf("failed to open writer: %w", err)
8187
}
8288

83-
return nil // all ready present
89+
return nil // already present
8490
}
8591
defer cw.Close()
8692

@@ -131,35 +137,63 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
131137
// the size or digest is unknown, these values may be empty.
132138
//
133139
// Copy is buffered, so no need to wrap reader in buffered io.
134-
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
140+
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
135141
ws, err := cw.Status()
136142
if err != nil {
137143
return fmt.Errorf("failed to get status: %w", err)
138144
}
139-
145+
r := or
140146
if ws.Offset > 0 {
141-
r, err = seekReader(r, ws.Offset, size)
147+
r, err = seekReader(or, ws.Offset, size)
142148
if err != nil {
143149
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
144150
}
145151
}
146152

147-
copied, err := copyWithBuffer(cw, r)
148-
if err != nil {
149-
return fmt.Errorf("failed to copy: %w", err)
150-
}
151-
if size != 0 && copied < size-ws.Offset {
152-
// Short writes would return its own error, this indicates a read failure
153-
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
154-
}
155-
156-
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
157-
if !errdefs.IsAlreadyExists(err) {
158-
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
153+
for i := 0; i < maxResets; i++ {
154+
if i >= 1 {
155+
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
156+
}
157+
copied, err := copyWithBuffer(cw, r)
158+
if errors.Is(err, ErrReset) {
159+
ws, err := cw.Status()
160+
if err != nil {
161+
return fmt.Errorf("failed to get status: %w", err)
162+
}
163+
r, err = seekReader(or, ws.Offset, size)
164+
if err != nil {
165+
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
166+
}
167+
continue
168+
}
169+
if err != nil {
170+
return fmt.Errorf("failed to copy: %w", err)
171+
}
172+
if size != 0 && copied < size-ws.Offset {
173+
// Short writes would return its own error, this indicates a read failure
174+
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
175+
}
176+
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
177+
if errors.Is(err, ErrReset) {
178+
ws, err := cw.Status()
179+
if err != nil {
180+
return fmt.Errorf("failed to get status: %w", err)
181+
}
182+
r, err = seekReader(or, ws.Offset, size)
183+
if err != nil {
184+
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
185+
}
186+
continue
187+
}
188+
if !errdefs.IsAlreadyExists(err) {
189+
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
190+
}
159191
}
192+
return nil
160193
}
161194

162-
return nil
195+
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
196+
return fmt.Errorf("failed to copy after %d retries", maxResets)
163197
}
164198

165199
// CopyReaderAt copies to a writer from a given reader at for the given

content/helpers_test.go

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
_ "crypto/sha256" // required by go-digest
23+
"fmt"
2324
"io"
2425
"strings"
2526
"testing"
@@ -38,38 +39,107 @@ type copySource struct {
3839
func TestCopy(t *testing.T) {
3940
defaultSource := newCopySource("this is the source to copy")
4041

42+
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
43+
i := 0
44+
return func() error {
45+
// function resets the first time
46+
if i == 0 {
47+
// this is the case where, the pipewriter to which the data was being written has
48+
// changed. which means we need to clear the buffer
49+
i++
50+
buf.Reset()
51+
st.Offset = 0
52+
return ErrReset
53+
}
54+
return nil
55+
}
56+
}
57+
58+
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
59+
i := 0
60+
return func() error {
61+
// function resets for more than the maxReset value
62+
if i < maxResets+1 {
63+
// this is the case where, the pipewriter to which the data was being written has
64+
// changed. which means we need to clear the buffer
65+
i++
66+
buf.Reset()
67+
st.Offset = 0
68+
return ErrReset
69+
}
70+
return nil
71+
}
72+
}
73+
74+
s1 := Status{}
75+
s2 := Status{}
76+
b1 := bytes.Buffer{}
77+
b2 := bytes.Buffer{}
78+
4179
var testcases = []struct {
42-
name string
43-
source copySource
44-
writer fakeWriter
45-
expected string
80+
name string
81+
source copySource
82+
writer fakeWriter
83+
expected string
84+
expectedErr error
4685
}{
4786
{
48-
name: "copy no offset",
49-
source: defaultSource,
50-
writer: fakeWriter{},
87+
name: "copy no offset",
88+
source: defaultSource,
89+
writer: fakeWriter{
90+
Buffer: &bytes.Buffer{},
91+
},
5192
expected: "this is the source to copy",
5293
},
5394
{
54-
name: "copy with offset from seeker",
55-
source: defaultSource,
56-
writer: fakeWriter{status: Status{Offset: 8}},
95+
name: "copy with offset from seeker",
96+
source: defaultSource,
97+
writer: fakeWriter{
98+
Buffer: &bytes.Buffer{},
99+
status: Status{Offset: 8},
100+
},
57101
expected: "the source to copy",
58102
},
59103
{
60-
name: "copy with offset from unseekable source",
61-
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
62-
writer: fakeWriter{status: Status{Offset: 3}},
104+
name: "copy with offset from unseekable source",
105+
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
106+
writer: fakeWriter{
107+
Buffer: &bytes.Buffer{},
108+
status: Status{Offset: 3},
109+
},
63110
expected: "bar",
64111
},
65112
{
66113
name: "commit already exists",
67114
source: newCopySource("this already exists"),
68-
writer: fakeWriter{commitFunc: func() error {
69-
return errdefs.ErrAlreadyExists
70-
}},
115+
writer: fakeWriter{
116+
Buffer: &bytes.Buffer{},
117+
commitFunc: func() error {
118+
return errdefs.ErrAlreadyExists
119+
}},
71120
expected: "this already exists",
72121
},
122+
{
123+
name: "commit fails first time with ErrReset",
124+
source: newCopySource("content to copy"),
125+
writer: fakeWriter{
126+
Buffer: &b1,
127+
status: s1,
128+
commitFunc: cf1(&b1, s1),
129+
},
130+
expected: "content to copy",
131+
},
132+
{
133+
name: "write fails more than maxReset times due to reset",
134+
source: newCopySource("content to copy"),
135+
writer: fakeWriter{
136+
Buffer: &b2,
137+
status: s2,
138+
commitFunc: cf2(&b2, s2),
139+
},
140+
expected: "",
141+
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
142+
},
73143
}
74144

75145
for _, testcase := range testcases {
@@ -80,6 +150,12 @@ func TestCopy(t *testing.T) {
80150
testcase.source.size,
81151
testcase.source.digest)
82152

153+
// if an error is expected then further comparisons are not required
154+
if testcase.expectedErr != nil {
155+
assert.Equal(t, testcase.expectedErr, err)
156+
return
157+
}
158+
83159
assert.NoError(t, err)
84160
assert.Equal(t, testcase.source.digest, testcase.writer.committedDigest)
85161
assert.Equal(t, testcase.expected, testcase.writer.String())
@@ -95,11 +171,13 @@ func newCopySource(raw string) copySource {
95171
}
96172
}
97173

174+
type commitFunction func() error
175+
98176
type fakeWriter struct {
99-
bytes.Buffer
177+
*bytes.Buffer
100178
committedDigest digest.Digest
101179
status Status
102-
commitFunc func() error
180+
commitFunc commitFunction
103181
}
104182

105183
func (f *fakeWriter) Close() error {

0 commit comments

Comments
 (0)