Skip to content

Commit c987e96

Browse files
BrennaEpptritonegcf-merge-on-green[bot]
authored
test(storage): test Copier.Run's token propagation + refactor int test (#6882)
Co-authored-by: Chris Cotter <[email protected]> Co-authored-by: gcf-merge-on-green[bot] <60162190+gcf-merge-on-green[bot]@users.noreply.github.com>
1 parent 438afcf commit c987e96

File tree

5 files changed

+169
-48
lines changed

5 files changed

+169
-48
lines changed

storage/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,11 @@ type destinationObject struct {
317317
}
318318

319319
type rewriteObjectRequest struct {
320-
srcObject sourceObject
321-
dstObject destinationObject
322-
predefinedACL string
323-
token string
320+
srcObject sourceObject
321+
dstObject destinationObject
322+
predefinedACL string
323+
token string
324+
maxBytesRewrittenPerCall int64
324325
}
325326

326327
type rewriteObjectResponse struct {

storage/copy.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ type Copier struct {
6969
DestinationKMSKeyName string
7070

7171
dst, src *ObjectHandle
72+
73+
// The maximum number of bytes that will be rewritten per rewrite request.
74+
// Most callers shouldn't need to specify this parameter - it is primarily
75+
// in place to support testing. If specified the value must be an integral
76+
// multiple of 1 MiB (1048576). Also, this only applies to requests where
77+
// the source and destination span locations and/or storage classes. Finally,
78+
// this value must not change across rewrite calls else you'll get an error
79+
// that the `rewriteToken` is invalid.
80+
maxBytesRewrittenPerCall int64
7281
}
7382

7483
// Run performs the copy.
@@ -108,8 +117,9 @@ func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
108117
encryptionKey: c.dst.encryptionKey,
109118
keyName: c.DestinationKMSKeyName,
110119
},
111-
predefinedACL: c.PredefinedACL,
112-
token: c.RewriteToken,
120+
predefinedACL: c.PredefinedACL,
121+
token: c.RewriteToken,
122+
maxBytesRewrittenPerCall: c.maxBytesRewrittenPerCall,
113123
}
114124

115125
isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist)

storage/grpc_client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,9 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
822822
call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
823823
call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
824824
}
825+
826+
call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall
827+
825828
var res *storagepb.RewriteResponse
826829
var err error
827830

storage/http_client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,11 @@ func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
747747
if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil {
748748
return nil, err
749749
}
750+
751+
if req.maxBytesRewrittenPerCall != 0 {
752+
call.MaxBytesRewrittenPerCall(req.maxBytesRewrittenPerCall)
753+
}
754+
750755
var res *raw.RewriteResponse
751756
var err error
752757
setClientHeader(call.Header())

storage/integration_test.go

Lines changed: 144 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,33 +1454,6 @@ func TestIntegration_Objects(t *testing.T) {
14541454
t.Errorf("Object %v is older than its containing bucket, %v", o, bAttrs)
14551455
}
14561456

1457-
// Test object copy.
1458-
copyName := "copy-" + objName
1459-
copyObj, err := bkt.Object(copyName).CopierFrom(bkt.Object(objName)).Run(ctx)
1460-
if err != nil {
1461-
t.Errorf("Copier.Run failed with %v", err)
1462-
} else if !namesEqual(copyObj, newBucketName, copyName) {
1463-
t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
1464-
copyObj.Bucket, copyObj.Name, newBucketName, copyName)
1465-
}
1466-
1467-
// Copying with attributes.
1468-
const contentEncoding = "identity"
1469-
copier := bkt.Object(copyName).CopierFrom(bkt.Object(objName))
1470-
copier.ContentEncoding = contentEncoding
1471-
copyObj, err = copier.Run(ctx)
1472-
if err != nil {
1473-
t.Errorf("Copier.Run failed with %v", err)
1474-
} else {
1475-
if !namesEqual(copyObj, newBucketName, copyName) {
1476-
t.Errorf("Copy object bucket, name: got %q.%q, want %q.%q",
1477-
copyObj.Bucket, copyObj.Name, newBucketName, copyName)
1478-
}
1479-
if copyObj.ContentEncoding != contentEncoding {
1480-
t.Errorf("Copy ContentEncoding: got %q, want %q", copyObj.ContentEncoding, contentEncoding)
1481-
}
1482-
}
1483-
14841457
objectHandle := bkt.Object(objName)
14851458

14861459
// Test UpdateAttrs.
@@ -1598,17 +1571,6 @@ func TestIntegration_Objects(t *testing.T) {
15981571
t.Error("Close expected an error, found none")
15991572
}
16001573

1601-
// Test deleting the copy object.
1602-
h.mustDeleteObject(bkt.Object(copyName))
1603-
// Deleting it a second time should return ErrObjectNotExist.
1604-
if err := bkt.Object(copyName).Delete(ctx); err != ErrObjectNotExist {
1605-
t.Errorf("second deletion of %v = %v; want ErrObjectNotExist", copyName, err)
1606-
}
1607-
_, err = bkt.Object(copyName).Attrs(ctx)
1608-
if err != ErrObjectNotExist {
1609-
t.Errorf("Copy is expected to be deleted, stat errored with %v", err)
1610-
}
1611-
16121574
// Test object composition.
16131575
var compSrcs []*ObjectHandle
16141576
var wantContents []byte
@@ -1650,6 +1612,150 @@ func TestIntegration_Objects(t *testing.T) {
16501612
})
16511613
}
16521614

1615+
func TestIntegration_Copy(t *testing.T) {
1616+
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
1617+
bucketFrom := client.Bucket(bucket)
1618+
bucketInSameRegion := client.Bucket(prefix + uidSpace.New())
1619+
bucketInDifferentRegion := client.Bucket(prefix + uidSpace.New())
1620+
1621+
// Create new bucket
1622+
if err := bucketInSameRegion.Create(ctx, testutil.ProjID(), nil); err != nil {
1623+
t.Fatalf("bucket.Create: %v", err)
1624+
}
1625+
defer bucketInSameRegion.Delete(ctx)
1626+
1627+
// Create new bucket
1628+
if err := bucketInDifferentRegion.Create(ctx, testutil.ProjID(), &BucketAttrs{Location: "NORTHAMERICA-NORTHEAST2"}); err != nil {
1629+
t.Fatalf("bucket.Create: %v", err)
1630+
}
1631+
defer bucketInDifferentRegion.Delete(ctx)
1632+
1633+
// We use a larger object size to be able to trigger multiple rewrite calls
1634+
minObjectSize := 2500000 // 2.5 Mb
1635+
obj := bucketFrom.Object("copy-object-original" + uidSpace.New())
1636+
1637+
// Create an object to copy from
1638+
w := obj.NewWriter(ctx)
1639+
c := randomContents()
1640+
for written := 0; written < minObjectSize; {
1641+
n, err := w.Write(c)
1642+
if err != nil {
1643+
t.Fatalf("w.Write: %v", err)
1644+
}
1645+
written += n
1646+
}
1647+
if err := w.Close(); err != nil {
1648+
t.Fatalf("w.Close: %v", err)
1649+
}
1650+
1651+
defer func() {
1652+
if err := obj.Delete(ctx); err != nil {
1653+
t.Errorf("obj.Delete: %v", err)
1654+
}
1655+
}()
1656+
1657+
attrs, err := obj.Attrs(ctx)
1658+
if err != nil {
1659+
t.Fatalf("obj.Attrs: %v", err)
1660+
}
1661+
1662+
crc32c := attrs.CRC32C
1663+
1664+
type copierAttrs struct {
1665+
contentEncoding string
1666+
maxBytesPerCall int64
1667+
}
1668+
1669+
for _, test := range []struct {
1670+
desc string
1671+
toObj string
1672+
toBucket *BucketHandle
1673+
copierAttrs *copierAttrs
1674+
numExpectedRewriteCalls int
1675+
}{
1676+
{
1677+
desc: "copy within bucket",
1678+
toObj: "copy-within-bucket",
1679+
toBucket: bucketFrom,
1680+
numExpectedRewriteCalls: 1,
1681+
},
1682+
{
1683+
desc: "copy to new bucket",
1684+
toObj: "copy-new-bucket",
1685+
toBucket: bucketInSameRegion,
1686+
numExpectedRewriteCalls: 1,
1687+
},
1688+
{
1689+
desc: "copy with attributes",
1690+
toObj: "copy-with-attributes",
1691+
toBucket: bucketInSameRegion,
1692+
copierAttrs: &copierAttrs{contentEncoding: "identity"},
1693+
numExpectedRewriteCalls: 1,
1694+
},
1695+
{
1696+
// this test should trigger multiple re-write calls and may fail
1697+
// with a rate limit error if those calls are stuck in an infinite loop
1698+
desc: "copy to new region",
1699+
toObj: "copy-new-region",
1700+
toBucket: bucketInDifferentRegion,
1701+
copierAttrs: &copierAttrs{maxBytesPerCall: 1048576},
1702+
numExpectedRewriteCalls: 3,
1703+
},
1704+
} {
1705+
t.Run(test.desc, func(t *testing.T) {
1706+
copyObj := test.toBucket.Object(test.toObj)
1707+
copier := copyObj.CopierFrom(obj)
1708+
1709+
if attrs := test.copierAttrs; attrs != nil {
1710+
if attrs.contentEncoding != "" {
1711+
copier.ContentEncoding = attrs.contentEncoding
1712+
}
1713+
if attrs.maxBytesPerCall != 0 {
1714+
copier.maxBytesRewrittenPerCall = attrs.maxBytesPerCall
1715+
}
1716+
}
1717+
1718+
rewriteCallsCount := 0
1719+
copier.ProgressFunc = func(_, _ uint64) {
1720+
rewriteCallsCount++
1721+
}
1722+
1723+
attrs, err = copier.Run(ctx)
1724+
if err != nil {
1725+
t.Fatalf("Copier.Run failed with %v", err)
1726+
}
1727+
defer func() {
1728+
if err := copyObj.Delete(ctx); err != nil {
1729+
t.Errorf("copyObj.Delete: %v", err)
1730+
}
1731+
}()
1732+
1733+
// Check copied object is in the correct bucket with the correct name
1734+
if attrs.Bucket != test.toBucket.name || attrs.Name != test.toObj {
1735+
t.Errorf("unexpected copy behaviour: got: %s in bucket %s, want: %s in bucket %s", attrs.Name, attrs.Bucket, attrs.Name, test.toBucket.name)
1736+
}
1737+
1738+
// Check attrs
1739+
if test.copierAttrs != nil {
1740+
if attrs.ContentEncoding != test.copierAttrs.contentEncoding {
1741+
t.Errorf("unexpected ContentEncoding; got: %s, want: %s", attrs.ContentEncoding, test.copierAttrs.contentEncoding)
1742+
}
1743+
}
1744+
1745+
// Check the copied contents
1746+
if attrs.CRC32C != crc32c {
1747+
t.Errorf("mismatching checksum: got %v, want %v", attrs.CRC32C, crc32c)
1748+
}
1749+
1750+
// Check that the number of requests made is as expected
1751+
if rewriteCallsCount != test.numExpectedRewriteCalls {
1752+
t.Errorf("unexpected number of rewrite calls: got %v, want %v", rewriteCallsCount, test.numExpectedRewriteCalls)
1753+
}
1754+
})
1755+
}
1756+
})
1757+
}
1758+
16531759
func TestIntegration_Encoding(t *testing.T) {
16541760
ctx := context.Background()
16551761
client := testConfig(ctx, t)
@@ -4777,10 +4883,6 @@ func putURL(url string, headers map[string][]string, payload io.Reader) ([]byte,
47774883
return bytes, nil
47784884
}
47794885

4780-
func namesEqual(obj *ObjectAttrs, bucketName, objectName string) bool {
4781-
return obj.Bucket == bucketName && obj.Name == objectName
4782-
}
4783-
47844886
func keyFileEmail(filename string) (string, error) {
47854887
bytes, err := ioutil.ReadFile(filename)
47864888
if err != nil {

0 commit comments

Comments
 (0)