Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,8 +1169,8 @@ func (s *gRPCAppendBidiWriteBufferSender) handleStream(stream storagepb.Storage_

type gRPCAppendTakeoverBidiWriteBufferSender struct {
gRPCAppendBidiWriteBufferSender
takeoverReported bool
setTakeoverOffset func(int64)
takeoverReported bool
handleTakeoverCompletion func(gRPCBidiWriteCompletion)
}

func writeObjectSpecAsAppendObjectSpec(s *storagepb.WriteObjectSpec, gen int64) *storagepb.AppendObjectSpec {
Expand All @@ -1197,8 +1197,11 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
},
takeoverReported: false,
setTakeoverOffset: w.setTakeoverOffset,
takeoverReported: false,
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
w.handleCompletion(c)
w.setTakeoverOffset(c.flushOffset)
},
}
}

Expand Down Expand Up @@ -1238,9 +1241,9 @@ func (s *gRPCAppendTakeoverBidiWriteBufferSender) connect(ctx context.Context, c
return
}

s.setTakeoverOffset(c.flushOffset)
s.maybeUpdateFirstMessage(resp)
s.takeoverReported = true
cs.completions <- *c
s.handleTakeoverCompletion(*c)
}

go s.handleStream(stream, cs, firstSend)
Expand Down
73 changes: 73 additions & 0 deletions storage/retry_conformance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,79 @@ var methods = map[string][]retryFunc{
return fmt.Errorf("Reader.Read: %w", err)
}

gotMd5 := md5.Sum(content)
expectedMd5 := md5.Sum(toWrite)
if d := cmp.Diff(gotMd5, expectedMd5); d != "" {
return fmt.Errorf("content mismatch, got %v bytes (md5: %v), want %v bytes (md5: %v)",
len(content), gotMd5, len(toWrite), expectedMd5)
}
return nil
},
// Appendable upload using a takeover.
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
bucketName := fmt.Sprintf("%s-appendable", bucketIDs.New())
b := c.Bucket(bucketName)
if err := b.Create(ctx, projectID, nil); err != nil {
return err
}
Comment on lines +746 to +748
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better error diagnostics and consistency with other error handling in this function, consider wrapping this error with additional context.

Suggested change
if err := b.Create(ctx, projectID, nil); err != nil {
return err
}
if err := b.Create(ctx, projectID, nil); err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}

defer b.Delete(ctx)

obj := b.Object(objectIDs.New())
if preconditions {
obj = obj.If(Conditions{DoesNotExist: true})
}

// Force multiple messages per chunk, and multiple chunks in the object.
chunkSize := 2 * maxPerMessageWriteSize
toWrite := generateRandomBytes(chunkSize * 3)

objW := obj.NewWriter(ctx)
objW.Append = true
objW.ChunkSize = chunkSize
if _, err := objW.Write(toWrite[0:maxPerMessageWriteSize]); err != nil {
return fmt.Errorf("Writer.Write: %w", err)
}
// Close this writer, which will create the appendable unfinalized object
// (there was not enough in Write to trigger a send).
if err := objW.Close(); err != nil {
return fmt.Errorf("Creation Writer.Close: %v", err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To enable proper error unwrapping using errors.Is or errors.As, it's better to use the %w format verb instead of %v when wrapping errors. This function is inconsistent, as it uses %w on line 764 but uses %v here and in several other places (e.g., lines 779, 786, 789, 800, 804). Using %w consistently is a modern Go best practice.

Suggested change
return fmt.Errorf("Creation Writer.Close: %v", err)
return fmt.Errorf("Creation Writer.Close: %w", err)

}

generation := int64(0)
if preconditions {
generation = objW.Attrs().Generation
}
objT := b.Object(obj.ObjectName()).Generation(generation)
w, l, err := objT.NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{ChunkSize: chunkSize})
if err != nil {
return fmt.Errorf("NewWriterFromAppendableObject: %v", err)
}
if l != int64(maxPerMessageWriteSize) {
return fmt.Errorf("NewWriterFromAppendableObject unexpected len: got %v, want %v", l, maxPerMessageWriteSize)
}

if _, err := w.Write(toWrite[maxPerMessageWriteSize:]); err != nil {
return fmt.Errorf("Writer.Write: %v", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("Writer.Close: %v", err)
}

if w.Attrs() == nil {
return fmt.Errorf("Writer.Attrs: expected attrs for written object, got nil")
}

// Don't reuse obj, in case preconditions were set on the write request.
r, err := b.Object(obj.ObjectName()).NewReader(ctx)
defer r.Close()
if err != nil {
return fmt.Errorf("obj.NewReader: %v", err)
}
content, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("Reader.Read: %v", err)
}

gotMd5 := md5.Sum(content)
expectedMd5 := md5.Sum(toWrite)
if d := cmp.Diff(gotMd5, expectedMd5); d != "" {
Comment on lines +743 to 809
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new test function is quite long (over 60 lines) and contains multiple logical steps, which can make it difficult to understand and maintain. Consider refactoring it by extracting distinct stages of the test into smaller, well-named helper functions. For example, you could have separate helpers for:

  • Setting up and creating the initial appendable object.
  • Performing the takeover write.

This would make the main test function a much clearer, high-level description of the test scenario.

Comment on lines 807 to 809
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for verifying content is duplicated from the test case above (lines 734-741). To improve maintainability and reduce code duplication, consider extracting this logic into a shared helper function. For example:

func verifyContent(content, toWrite []byte) error {
	gotMd5 := md5.Sum(content)
	expectedMd5 := md5.Sum(toWrite)
	if d := cmp.Diff(gotMd5, expectedMd5); d != "" {
		return fmt.Errorf("content mismatch, got %v bytes (md5: %v), want %v bytes (md5: %v)",
			len(content), gotMd5, len(toWrite), expectedMd5)
	}
	return nil
}

Then both tests could simply call return verifyContent(content, toWrite) at the end.

Expand Down
Loading