Skip to content

Commit 6e57e38

Browse files
feat: support write checksums in json resumable uploads (#3405)
* feat: support write checksums in json resumable uploads * add mutli chunk checksum test * change checksum option from disable to enable * empty commit * resolve comments * resolve comments * revert NewBuffer signature * remove argument in media option
1 parent 1d9673a commit 6e57e38

File tree

6 files changed

+130
-6
lines changed

6 files changed

+130
-6
lines changed

googleapi/googleapi.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,28 @@ func ChunkRetryDeadline(deadline time.Duration) MediaOption {
333333
return chunkRetryDeadlineOption(deadline)
334334
}
335335

336+
type enableAutoChecksumOption struct{}
337+
338+
func (d enableAutoChecksumOption) setOptions(o *MediaOptions) {
339+
o.EnableAutoChecksum = true
340+
}
341+
342+
// EnableAutoChecksum returns a MediaOption that enables automatic checksum
343+
// calculation, which is only supported for resumable multi-chunk uploads.
344+
// The computed checksum is sent on the final upload request to the server.
345+
// Writes are rejected in the event of a checksum mismatch.
346+
func EnableAutoChecksum() MediaOption {
347+
return enableAutoChecksumOption{}
348+
}
349+
336350
// MediaOptions stores options for customizing media upload. It is not used by developers directly.
337351
type MediaOptions struct {
338352
ContentType string
339353
ForceEmptyContentType bool
340354
ChunkSize int
341355
ChunkRetryDeadline time.Duration
342356
ChunkTransferTimeout time.Duration
357+
EnableAutoChecksum bool
343358
}
344359

345360
// ProcessMediaOptions stores options from opts in a MediaOptions.

internal/gensupport/buffer.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package gensupport
66

77
import (
88
"bytes"
9+
"hash/crc32"
910
"io"
1011

1112
"google.golang.org/api/googleapi"
@@ -21,8 +22,17 @@ type MediaBuffer struct {
2122

2223
// The absolute position of chunk in the underlying media.
2324
off int64
25+
26+
// fullObjectChecksum holds the running checksum of streamed media chunks when automatic checksum
27+
// calculation is enabled via enableAutoChecksum.
28+
fullObjectChecksum uint32
29+
enableAutoChecksum bool
2430
}
2531

32+
var (
33+
crc32cTable = crc32.MakeTable(crc32.Castagnoli)
34+
)
35+
2636
// NewMediaBuffer initializes a MediaBuffer.
2737
func NewMediaBuffer(media io.Reader, chunkSize int) *MediaBuffer {
2838
return &MediaBuffer{media: media, chunk: make([]byte, 0, chunkSize)}
@@ -52,6 +62,9 @@ func (mb *MediaBuffer) loadChunk() error {
5262
read += n
5363
}
5464
mb.chunk = mb.chunk[:read]
65+
if mb.enableAutoChecksum {
66+
mb.fullObjectChecksum = crc32.Update(mb.fullObjectChecksum, crc32cTable, mb.chunk)
67+
}
5568
return err
5669
}
5770

internal/gensupport/buffer_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package gensupport
66

77
import (
88
"bytes"
9+
"hash/crc32"
910
"io"
1011
"reflect"
1112
"testing"
@@ -293,3 +294,24 @@ func TestAdapter(t *testing.T) {
293294
checkConversion(to, tc.wantTyper)
294295
}
295296
}
297+
298+
func TestChecksum(t *testing.T) {
299+
data := "abcdefg"
300+
mb := NewMediaBuffer(bytes.NewReader([]byte(data)), 3)
301+
mb.enableAutoChecksum = true
302+
for {
303+
_, err := getChunkAsString(t, mb)
304+
if err == io.EOF {
305+
break
306+
}
307+
if err != nil {
308+
t.Fatalf("getChunkAsString() error: %v", err)
309+
}
310+
mb.Next()
311+
}
312+
313+
want := crc32.Checksum([]byte(data), crc32cTable)
314+
if got := mb.fullObjectChecksum; got != want {
315+
t.Errorf("mb.fullObjectChecksum = %d; want %d", got, want)
316+
}
317+
}

internal/gensupport/media.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,12 @@ func typeHeader(contentType string) textproto.MIMEHeader {
118118
//
119119
// After PrepareUpload has been called, media should no longer be used: the
120120
// media content should be accessed via one of the return values.
121-
func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
121+
func PrepareUpload(media io.Reader, chunkSize int, enableAutoChecksum bool) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
122122
if chunkSize == 0 { // do not chunk
123123
return media, nil, true
124124
}
125125
mb = NewMediaBuffer(media, chunkSize)
126+
mb.enableAutoChecksum = enableAutoChecksum
126127
_, _, _, err := mb.Chunk()
127128
// If err is io.EOF, we can upload this in a single request. Otherwise, err is
128129
// either nil or a non-EOF error. If it is the latter, then the next call to
@@ -159,7 +160,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
159160
}
160161
mi.chunkRetryDeadline = opts.ChunkRetryDeadline
161162
mi.chunkTransferTimeout = opts.ChunkTransferTimeout
162-
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
163+
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.EnableAutoChecksum)
163164
return mi
164165
}
165166

internal/gensupport/resumable.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package gensupport
66

77
import (
88
"context"
9+
"encoding/base64"
10+
"encoding/binary"
911
"errors"
1012
"fmt"
1113
"io"
@@ -18,6 +20,11 @@ import (
1820
"google.golang.org/api/internal"
1921
)
2022

23+
const (
24+
crc32cPrefix = "crc32c"
25+
hashHeaderKey = "X-Goog-Hash"
26+
)
27+
2128
// ResumableUpload is used by the generated APIs to provide resumable uploads.
2229
// It is not used by developers directly.
2330
type ResumableUpload struct {
@@ -103,6 +110,11 @@ func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader,
103110
// 308" response header.
104111
req.Header.Set("X-GUploader-No-308", "yes")
105112

113+
// Server accepts checksum only on final request through header.
114+
if final && rx.Media.enableAutoChecksum {
115+
req.Header.Set(hashHeaderKey, fmt.Sprintf("%v=%v", crc32cPrefix, encodeUint32(rx.Media.fullObjectChecksum)))
116+
}
117+
106118
return SendRequest(ctx, rx.Client, req)
107119
}
108120

@@ -335,3 +347,10 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
335347
return resp, nil
336348
}
337349
}
350+
351+
// Encode a uint32 as Base64 in big-endian byte order.
352+
func encodeUint32(u uint32) string {
353+
b := make([]byte, 4)
354+
binary.BigEndian.PutUint32(b, u)
355+
return base64.StdEncoding.EncodeToString(b)
356+
}

internal/gensupport/resumable_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
package gensupport
66

77
import (
8+
"bytes"
89
"context"
910
"errors"
1011
"fmt"
12+
"hash/crc32"
1113
"io"
1214
"net/http"
1315
"reflect"
@@ -36,9 +38,10 @@ type event struct {
3638
// It records the incoming data, unless the corresponding event is configured to return
3739
// http.StatusServiceUnavailable.
3840
type interruptibleTransport struct {
39-
events []event
40-
buf []byte
41-
bodies bodyTracker
41+
events []event
42+
buf []byte
43+
bodies bodyTracker
44+
finalHeader http.Header
4245
}
4346

4447
// bodyTracker keeps track of response bodies that have not been closed.
@@ -81,6 +84,7 @@ func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, e
8184
if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
8285
return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
8386
}
87+
t.finalHeader = req.Header
8488

8589
if ev.responseStatus != http.StatusServiceUnavailable {
8690
buf, err := io.ReadAll(req.Body)
@@ -443,7 +447,7 @@ func TestChunkTransferTimeout(t *testing.T) {
443447

444448
rx := &ResumableUpload{
445449
Client: &http.Client{Transport: transport},
446-
Media: NewMediaBuffer(media, len(data)), // Chunk size is the whole payload
450+
Media: NewMediaBuffer(media, len(data)), // Chunk size is the whole payload.
447451
MediaType: "text/plain",
448452
ChunkTransferTimeout: tc.chunkTransferTimeout,
449453
ChunkRetryDeadline: 100 * time.Millisecond,
@@ -628,3 +632,53 @@ func TestOverallUploadTimeout(t *testing.T) {
628632
t.Fatalf("Upload err: got: %v; want: context.DeadlineExceeded", err)
629633
}
630634
}
635+
636+
func TestUploadChecksum(t *testing.T) {
637+
data := string(bytes.Repeat([]byte("a"), 300))
638+
chunkSize := 90
639+
checksum := crc32.Checksum([]byte(data), crc32cTable)
640+
tests := []struct {
641+
name string
642+
chunkSize int
643+
sendChecksum bool
644+
wantChecksumHeader string
645+
}{
646+
{
647+
name: "checksum disabled",
648+
sendChecksum: false,
649+
},
650+
{
651+
name: "checksum enabled",
652+
sendChecksum: true,
653+
wantChecksumHeader: fmt.Sprintf("%v=%v", crc32cPrefix, encodeUint32(checksum)),
654+
},
655+
}
656+
for _, tc := range tests {
657+
media := strings.NewReader(data)
658+
659+
// Simulate multi-chunk resumable requests.
660+
tr := &interruptibleTransport{
661+
events: []event{
662+
{byteRange: "bytes 0-89/*", responseStatus: 308},
663+
{byteRange: "bytes 90-179/*", responseStatus: 308},
664+
{byteRange: "bytes 180-269/*", responseStatus: 308},
665+
{byteRange: "bytes 270-299/300", responseStatus: 200},
666+
},
667+
bodies: bodyTracker{},
668+
}
669+
rx := &ResumableUpload{
670+
Client: &http.Client{Transport: tr},
671+
Media: NewMediaBuffer(media, chunkSize),
672+
MediaType: "text/plain",
673+
}
674+
rx.Media.enableAutoChecksum = tc.sendChecksum
675+
res, err := rx.Upload(context.Background())
676+
if err != nil {
677+
t.Fatalf("Upload failed: %v", err)
678+
}
679+
res.Body.Close()
680+
if gotChecksumHeader := tr.finalHeader.Get(hashHeaderKey); gotChecksumHeader != tc.wantChecksumHeader {
681+
t.Errorf("Hash header: got %q, want %q", gotChecksumHeader, tc.wantChecksumHeader)
682+
}
683+
}
684+
}

0 commit comments

Comments
 (0)