Skip to content

Commit f25f36c

Browse files
committed
proxy: break up writes from the remote writer to avoid grpc limits
The remote content writer proxy already has the capability to break up large files into multiple writes, but the current API doesn't recognize when it's about to exceed the limits and attempts to send the data over grpc in one message instead of breaking it into multiple messages. This changes the behavior of `Write` to automatically break up the size of the content based on the max send message size. Signed-off-by: Jonathan A. Sternberg <[email protected]>
1 parent 7930936 commit f25f36c

File tree

2 files changed

+37
-21
lines changed

2 files changed

+37
-21
lines changed

core/content/proxy/content_writer.go

+24-18
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
"context"
2121
"fmt"
2222
"io"
23+
"slices"
2324

2425
contentapi "github.com/containerd/containerd/api/services/content/v1"
2526
"github.com/containerd/errdefs/pkg/errgrpc"
2627
digest "github.com/opencontainers/go-digest"
2728

2829
"github.com/containerd/containerd/v2/core/content"
30+
"github.com/containerd/containerd/v2/defaults"
2931
"github.com/containerd/containerd/v2/pkg/protobuf"
3032
)
3133

@@ -76,27 +78,31 @@ func (rw *remoteWriter) Digest() digest.Digest {
7678
}
7779

7880
func (rw *remoteWriter) Write(p []byte) (n int, err error) {
79-
offset := rw.offset
80-
81-
resp, err := rw.send(&contentapi.WriteContentRequest{
82-
Action: contentapi.WriteAction_WRITE,
83-
Offset: offset,
84-
Data: p,
85-
})
86-
if err != nil {
87-
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
88-
}
81+
const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1
82+
for data := range slices.Chunk(p, maxBufferSize) {
83+
offset := rw.offset
84+
85+
resp, err := rw.send(&contentapi.WriteContentRequest{
86+
Action: contentapi.WriteAction_WRITE,
87+
Offset: offset,
88+
Data: data,
89+
})
90+
if err != nil {
91+
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
92+
}
8993

90-
n = int(resp.Offset - offset)
91-
if n < len(p) {
92-
err = io.ErrShortWrite
93-
}
94+
written := int(resp.Offset - offset)
95+
rw.offset += int64(written)
96+
if resp.Digest != "" {
97+
rw.digest = digest.Digest(resp.Digest)
98+
}
99+
n += written
94100

95-
rw.offset += int64(n)
96-
if resp.Digest != "" {
97-
rw.digest = digest.Digest(resp.Digest)
101+
if written < len(data) {
102+
return n, io.ErrShortWrite
103+
}
98104
}
99-
return
105+
return n, nil
100106
}
101107

102108
func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) {

core/content/testsuite/testsuite.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
175175
}
176176
defer w4.Close()
177177

178+
c5, d5 := createContent(16 << 21)
179+
w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5}))
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
defer w5.Close()
184+
178185
smallbuf := make([]byte, 32)
179186
for _, s := range []struct {
180187
content []byte
@@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
201208
digest: d4,
202209
writer: w4,
203210
},
211+
{
212+
content: c5,
213+
digest: d5,
214+
writer: w5,
215+
},
204216
} {
205217
n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf)
206218
if err != nil {
@@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
647659
if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil {
648660
t.Fatalf("Check info failed: %+v", err)
649661
}
650-
651662
}
652663

653664
func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) {
@@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz
734745
}
735746
if err := w.Commit(ctx, size, dgst); err != nil {
736747
return fmt.Errorf("commit failed: %w", err)
737-
738748
}
739749
return nil
740750
}
@@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
931941
if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil {
932942
t.Fatal(err)
933943
}
934-
935944
}
936945

937946
func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
@@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected
11291138

11301139
return nil
11311140
}
1141+
11321142
func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {
11331143
if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil {
11341144
return err

0 commit comments

Comments
 (0)