Skip to content

Commit eaa7ca8

Browse files
committed
proxy: break up writes from the remote writer to avoid grpc limits
The remote content writer proxy already has the capability to break up large files into multiple writes, but the current API doesn't recognize when it's about to exceed the limits and attempts to send the data over grpc in one message instead of breaking it into multiple messages. This changes the behavior of `Write` to automatically break up the size of the content based on the max send message size. Signed-off-by: Jonathan A. Sternberg <[email protected]> (cherry picked from commit f25f36c)
1 parent 67bb32a commit eaa7ca8

2 files changed

Lines changed: 41 additions & 20 deletions

File tree

core/content/proxy/content_writer.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
digest "github.com/opencontainers/go-digest"
2727

2828
"github.com/containerd/containerd/v2/core/content"
29+
"github.com/containerd/containerd/v2/defaults"
2930
"github.com/containerd/containerd/v2/pkg/protobuf"
3031
)
3132

@@ -76,27 +77,37 @@ func (rw *remoteWriter) Digest() digest.Digest {
7677
}
7778

7879
func (rw *remoteWriter) Write(p []byte) (n int, err error) {
79-
offset := rw.offset
80+
const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1
81+
for i := 0; i < len(p); i += maxBufferSize {
82+
offset := rw.offset
8083

81-
resp, err := rw.send(&contentapi.WriteContentRequest{
82-
Action: contentapi.WriteAction_WRITE,
83-
Offset: offset,
84-
Data: p,
85-
})
86-
if err != nil {
87-
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
88-
}
84+
end := i + maxBufferSize
85+
if end > len(p) {
86+
end = len(p)
87+
}
88+
data := p[i:end]
89+
90+
resp, err := rw.send(&contentapi.WriteContentRequest{
91+
Action: contentapi.WriteAction_WRITE,
92+
Offset: offset,
93+
Data: data,
94+
})
95+
if err != nil {
96+
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
97+
}
8998

90-
n = int(resp.Offset - offset)
91-
if n < len(p) {
92-
err = io.ErrShortWrite
93-
}
99+
written := int(resp.Offset - offset)
100+
rw.offset += int64(written)
101+
if resp.Digest != "" {
102+
rw.digest = digest.Digest(resp.Digest)
103+
}
104+
n += written
94105

95-
rw.offset += int64(n)
96-
if resp.Digest != "" {
97-
rw.digest = digest.Digest(resp.Digest)
106+
if written < len(data) {
107+
return n, io.ErrShortWrite
108+
}
98109
}
99-
return
110+
return n, nil
100111
}
101112

102113
func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) {

core/content/testsuite/testsuite.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
175175
}
176176
defer w4.Close()
177177

178+
c5, d5 := createContent(16 << 21)
179+
w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5}))
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
defer w5.Close()
184+
178185
smallbuf := make([]byte, 32)
179186
for _, s := range []struct {
180187
content []byte
@@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
201208
digest: d4,
202209
writer: w4,
203210
},
211+
{
212+
content: c5,
213+
digest: d5,
214+
writer: w5,
215+
},
204216
} {
205217
n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf)
206218
if err != nil {
@@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
647659
if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil {
648660
t.Fatalf("Check info failed: %+v", err)
649661
}
650-
651662
}
652663

653664
func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) {
@@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz
734745
}
735746
if err := w.Commit(ctx, size, dgst); err != nil {
736747
return fmt.Errorf("commit failed: %w", err)
737-
738748
}
739749
return nil
740750
}
@@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
931941
if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil {
932942
t.Fatal(err)
933943
}
934-
935944
}
936945

937946
func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
@@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected
11291138

11301139
return nil
11311140
}
1141+
11321142
func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {
11331143
if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil {
11341144
return err

0 commit comments

Comments
 (0)