Skip to content

Commit 29a899b

Browse files
authored
Merge pull request #2072 from dmcgowan/cherry-pick-content-discard
[release/1.0] content: update copy to discard instead of truncate
2 parents 3e89d82 + 45e7aa5 commit 29a899b

3 files changed

Lines changed: 134 additions & 19 deletions

File tree

content/helpers.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package content
33
import (
44
"context"
55
"io"
6+
"io/ioutil"
67
"sync"
78

89
"github.com/containerd/containerd/errdefs"
@@ -76,14 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
7677
if ws.Offset > 0 {
7778
r, err = seekReader(r, ws.Offset, size)
7879
if err != nil {
79-
if !isUnseekable(err) {
80-
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
81-
}
82-
83-
// reader is unseekable, try to move the writer back to the start.
84-
if err := cw.Truncate(0); err != nil {
85-
return errors.Wrapf(err, "content writer truncate failed")
86-
}
80+
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
8781
}
8882
}
8983

@@ -103,14 +97,9 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
10397
return nil
10498
}
10599

106-
var errUnseekable = errors.New("seek not supported")
107-
108-
func isUnseekable(err error) bool {
109-
return errors.Cause(err) == errUnseekable
110-
}
111-
112100
// seekReader attempts to seek the reader to the given offset, either by
113-
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
101+
// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
102+
// up to the given offset.
114103
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
115104
// attempt to resolve r as a seeker and setup the offset.
116105
seeker, ok := r.(io.Seeker)
@@ -134,5 +123,17 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
134123
return sr, nil
135124
}
136125

137-
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
126+
// well then, let's just discard up to the offset
127+
buf := bufPool.Get().(*[]byte)
128+
defer bufPool.Put(buf)
129+
130+
n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf)
131+
if err != nil {
132+
return nil, errors.Wrap(err, "failed to discard to offset")
133+
}
134+
if n != offset {
135+
return nil, errors.Errorf("unable to discard to offset")
136+
}
137+
138+
return r, nil
138139
}

content/helpers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ func TestCopy(t *testing.T) {
4242
},
4343
{
4444
name: "copy with offset from unseekable source",
45-
source: copySource{reader: bytes.NewBufferString("foo"), size: 3},
46-
writer: fakeWriter{status: Status{Offset: 8}},
47-
expected: "foo",
45+
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
46+
writer: fakeWriter{status: Status{Offset: 3}},
47+
expected: "bar",
4848
},
4949
{
5050
name: "commit already exists",

content/testsuite/testsuite.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package testsuite
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"io/ioutil"
89
"math/rand"
@@ -24,6 +25,11 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r
2425
t.Run("Writer", makeTest(t, name, storeFn, checkContentStoreWriter))
2526
t.Run("UploadStatus", makeTest(t, name, storeFn, checkUploadStatus))
2627
t.Run("Resume", makeTest(t, name, storeFn, checkResumeWriter))
28+
t.Run("ResumeTruncate", makeTest(t, name, storeFn, checkResume(resumeTruncate)))
29+
t.Run("ResumeDiscard", makeTest(t, name, storeFn, checkResume(resumeDiscard)))
30+
t.Run("ResumeCopy", makeTest(t, name, storeFn, checkResume(resumeCopy)))
31+
t.Run("ResumeCopySeeker", makeTest(t, name, storeFn, checkResume(resumeCopySeeker)))
32+
t.Run("ResumeCopyReaderAt", makeTest(t, name, storeFn, checkResume(resumeCopyReaderAt)))
2733
t.Run("Labels", makeTest(t, name, storeFn, checkLabels))
2834
}
2935

@@ -352,6 +358,114 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
352358

353359
}
354360

361+
func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) {
362+
return func(ctx context.Context, t *testing.T, cs content.Store) {
363+
sizes := []int64{500, 5000, 50000}
364+
truncations := []float64{0.0, 0.1, 0.5, 0.9, 1.0}
365+
366+
for i, size := range sizes {
367+
for j, tp := range truncations {
368+
b, d := createContent(size, int64(i*len(truncations)+j))
369+
limit := int64(float64(size) * tp)
370+
ref := fmt.Sprintf("ref-%d-%d", i, j)
371+
372+
w, err := cs.Writer(ctx, ref, size, d)
373+
if err != nil {
374+
t.Fatal(err)
375+
}
376+
377+
if _, err := w.Write(b[:limit]); err != nil {
378+
w.Close()
379+
t.Fatal(err)
380+
}
381+
382+
if err := w.Close(); err != nil {
383+
t.Fatal(err)
384+
}
385+
386+
w, err = cs.Writer(ctx, ref, size, d)
387+
if err != nil {
388+
t.Fatal(err)
389+
}
390+
391+
st, err := w.Status()
392+
if err != nil {
393+
w.Close()
394+
t.Fatal(err)
395+
}
396+
397+
if st.Offset != limit {
398+
w.Close()
399+
t.Fatalf("Unexpected offset %d, expected %d", st.Offset, limit)
400+
}
401+
402+
preCommit := time.Now()
403+
if err := rf(ctx, w, b, limit, size, d); err != nil {
404+
t.Fatalf("Resume failed: %+v", err)
405+
}
406+
postCommit := time.Now()
407+
408+
if err := w.Close(); err != nil {
409+
t.Fatal(err)
410+
}
411+
412+
info := content.Info{
413+
Digest: d,
414+
Size: size,
415+
}
416+
417+
if err := checkInfo(ctx, cs, d, info, preCommit, postCommit, preCommit, postCommit); err != nil {
418+
t.Fatalf("Check info failed: %+v", err)
419+
}
420+
}
421+
}
422+
}
423+
}
424+
425+
func resumeTruncate(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error {
426+
if err := w.Truncate(0); err != nil {
427+
return errors.Wrap(err, "truncate failed")
428+
}
429+
430+
if _, err := io.CopyBuffer(w, bytes.NewReader(b), make([]byte, 1024)); err != nil {
431+
return errors.Wrap(err, "write failed")
432+
}
433+
434+
return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed")
435+
}
436+
437+
func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, size int64, dgst digest.Digest) error {
438+
if _, err := io.CopyBuffer(w, bytes.NewReader(b[written:]), make([]byte, 1024)); err != nil {
439+
return errors.Wrap(err, "write failed")
440+
}
441+
return errors.Wrap(w.Commit(ctx, size, dgst), "commit failed")
442+
}
443+
444+
func resumeCopy(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
445+
r := struct {
446+
io.Reader
447+
}{bytes.NewReader(b)}
448+
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
449+
}
450+
451+
func resumeCopySeeker(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
452+
r := struct {
453+
io.ReadSeeker
454+
}{bytes.NewReader(b)}
455+
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
456+
}
457+
458+
func resumeCopyReaderAt(ctx context.Context, w content.Writer, b []byte, _, size int64, dgst digest.Digest) error {
459+
type readerAt interface {
460+
io.Reader
461+
io.ReaderAt
462+
}
463+
r := struct {
464+
readerAt
465+
}{bytes.NewReader(b)}
466+
return errors.Wrap(content.Copy(ctx, w, r, size, dgst), "copy failed")
467+
}
468+
355469
func checkStatus(t *testing.T, w content.Writer, expected content.Status, d digest.Digest, preStart, postStart, preUpdate, postUpdate time.Time) {
356470
t.Helper()
357471
st, err := w.Status()

0 commit comments

Comments
 (0)