Skip to content

Commit a3df9c7

Browse files
authored
fix(storage): use hash values in InsertObject() (#7025)
If the application provides a `gcs::Crc32ChecksumValue()` or `gcs::MD5HashValue()` to `InsertObject()` we should use those values to create the metadata portion of the upload. Otherwise we are losing an opportunity to validate the data integrity during the upload. I discovered this while refactoring and restructuring the integration tests for CRC32C checksums and MD5 hashes. Apologies for the massive changes in the tests for what amounts to a small fix. The tests now have 3 portions, one for `InsertObject()`, one for `WriteObject()` and one for `ReadObject()`. Unlike before, the code for XML and JSON tests is shared, though this required some changes in the emulator to make it easier to detect what hash fields (if any) were including in the upload. For `InsertObject()` and `WriteObject()` we test 5 scenarios: (1) using the default settings, which are enabled for CRC32C, and disabled for MD5, (2) explicitly disabling the hash, (3) explicitly enabling the hash, (4) setting the hash to the correct value, and (5) setting the hash to the incorrect value. For `WriteObject()` we also test detecting an invalid hash, both in the case where the library computes the hash and the case where the application provides it. For `ReadObject()` it is simpler, because we cannot provide expected hashes. So we only test the cases where the hash check is in its default setting (enabled for CRC32, disabled for MD5) and that hash mistmatches are detected when the hash is enabled.
1 parent e49eefa commit a3df9c7

10 files changed

Lines changed: 570 additions & 633 deletions

File tree

google/cloud/storage/emulator/emulator.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def index():
4848
TODO(#6615): Introducing failures into uploads with return-XXX-after-YYYk
4949
"""
5050

51+
5152
# Needs to be defined in emulator.py to keep context of flask and db global variables
5253
def retry_test(method):
5354
global supported_methods
@@ -805,7 +806,6 @@ def resumable_upload_chunk(bucket_name):
805806
blob.metadata.metadata["x_emulator_transfer_encoding"] = ":".join(
806807
upload.transfer
807808
)
808-
blob.metadata.metadata["x_emulator_upload"] = "resumable"
809809
db.insert_object(upload.request, bucket_name, blob, None)
810810
projection = utils.common.extract_projection(
811811
upload.request, CommonEnums.Projection.NO_ACL, None
@@ -906,7 +906,6 @@ def xml_get_object(bucket_name, object_name):
906906
# Define the WSGI application to handle IAM requests
907907
(IAM_HANDLER_PATH, iam_app) = gcs_type.iam.get_iam_app()
908908

909-
910909
server = HandleGzipMiddleware(
911910
DispatcherMiddleware(
912911
root,

google/cloud/storage/emulator/gcs/holder.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
class DataHolder(types.SimpleNamespace):
2929
rest_only_fields = ["customTime"]
30+
__upload_id_generator = 0
3031

3132
def __init__(self, **kwargs):
3233
super().__init__(**kwargs)
@@ -57,6 +58,27 @@ def init_upload(
5758
rest_only=rest_only,
5859
)
5960

61+
@classmethod
62+
def __preprocess_rest_metadata(cls, metadata):
63+
return utils.common.rest_adjust(
64+
metadata,
65+
{
66+
"crc32c": lambda x: (
67+
"crc32c",
68+
utils.common.rest_crc32c_to_proto(metadata["crc32c"]),
69+
)
70+
},
71+
)
72+
73+
@classmethod
74+
def __create_upload_id(cls, bucket_name, object_name):
75+
cls.__upload_id_generator = cls.__upload_id_generator + 1
76+
return hashlib.sha256(
77+
(
78+
"%d/%s/o/%s" % (cls.__upload_id_generator, bucket_name, object_name)
79+
).encode("utf-8")
80+
).hexdigest()
81+
6082
@classmethod
6183
def init_resumable_rest(cls, request, bucket):
6284
query_name = request.args.get("name", None)
@@ -76,7 +98,19 @@ def init_resumable_rest(cls, request, bucket):
7698
context=None,
7799
)
78100
rest_only = cls.__extract_rest_only(data)
79-
metadata = json_format.ParseDict(data, metadata)
101+
metadata = json_format.ParseDict(
102+
cls.__preprocess_rest_metadata(data), metadata
103+
)
104+
# Add some annotations to make it easier to write tests
105+
metadata.metadata["x_emulator_upload"] = "resumable"
106+
if data.get("crc32c", None) is not None:
107+
metadata.metadata["x_emulator_crc32c"] = data.get("crc32c")
108+
if data.get("md5Hash", None) is not None:
109+
metadata.metadata["x_emulator_md5"] = data.get("md5Hash")
110+
if metadata.metadata.get("x_emulator_crc32c", None) is None:
111+
metadata.metadata["x_emulator_no_crc32c"] = "true"
112+
if metadata.metadata.get("x_emulator_md5", None) is None:
113+
metadata.metadata["x_emulator_no_md5"] = "true"
80114
if query_name:
81115
metadata.name = query_name
82116
if metadata.name == "":
@@ -85,9 +119,7 @@ def init_resumable_rest(cls, request, bucket):
85119
metadata.content_type = request.headers.get(
86120
"x-upload-content-type", "application/octet-stream"
87121
)
88-
upload_id = hashlib.sha256(
89-
("%s/o/%s" % (bucket.name, metadata.name)).encode("utf-8")
90-
).hexdigest()
122+
upload_id = cls.__create_upload_id(bucket.name, metadata.name)
91123
location = (
92124
request.host_url
93125
+ "upload/storage/v1/b/%s/o?uploadType=resumable&upload_id=%s"
@@ -108,9 +140,19 @@ def init_resumable_rest(cls, request, bucket):
108140
@classmethod
109141
def init_resumable_grpc(cls, request, bucket, context):
110142
metadata = request.insert_object_spec.resource
111-
upload_id = hashlib.sha256(
112-
("%s/o/%s" % (bucket.name, metadata.name)).encode("utf-8")
113-
).hexdigest()
143+
# Add some annotations to make it easier to write tests
144+
metadata.metadata["x_emulator_upload"] = "resumable"
145+
if metadata.crc32c.value is not None:
146+
metadata.metadata[
147+
"x_emulator_crc32c"
148+
] = utils.common.rest_crc32c_from_proto(metadata.crc32c.value)
149+
else:
150+
metadata.metadata["x_emulator_no_crc3c"] = "true"
151+
if metadata.md5_hash is not None:
152+
metadata.metadata["x_emulator_md5"] = metadata.md5_hash
153+
else:
154+
metadata.metadata["x_emulator_no_md5"] = "true"
155+
upload_id = cls.__create_upload_id(bucket.name, metadata.name)
114156
fake_request = utils.common.FakeRequest.init_protobuf(request, context)
115157
fake_request.update_protobuf(request.insert_object_spec, context)
116158
return cls.init_upload(fake_request, metadata, bucket, "", upload_id)

google/cloud/storage/emulator/gcs/object.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,7 @@ def init_multipart(cls, request, bucket):
203203
metadata["md5Hash"] = metadata["md5Hash"]
204204
if "crc32c" in metadata:
205205
metadata["metadata"]["x_emulator_crc32c"] = metadata["crc32c"]
206-
metadata["crc32c"] = struct.unpack(
207-
">I", base64.b64decode(metadata["crc32c"].encode("utf-8"))
208-
)[0]
206+
metadata["crc32c"] = utils.common.rest_crc32c_to_proto(metadata["crc32c"])
209207
return cls.init_dict(request, metadata, media, bucket, False)
210208

211209
@classmethod
@@ -227,9 +225,7 @@ def init_xml(cls, request, bucket, name):
227225
metadata["md5Hash"] = md5Hash
228226
if checksum.startswith("crc32c="):
229227
crc32c_value = checksum[7:]
230-
metadata["crc32c"] = struct.unpack(
231-
">I", base64.b64decode(crc32c_value.encode("utf-8"))
232-
)[0]
228+
metadata["crc32c"] = utils.common.rest_crc32c_to_proto(crc32c_value)
233229
blob, _ = cls.init_dict(fake_request, metadata, media, bucket, False)
234230
return blob, fake_request
235231

@@ -374,9 +370,7 @@ def delete_acl(self, entity, context):
374370
def rest(cls, metadata, rest_only):
375371
response = json_format.MessageToDict(metadata)
376372
response["kind"] = "storage#object"
377-
response["crc32c"] = base64.b64encode(
378-
struct.pack(">I", response["crc32c"])
379-
).decode("utf-8")
373+
response["crc32c"] = utils.common.rest_crc32c_from_proto(response["crc32c"])
380374
response.update(rest_only)
381375
old_metadata = {}
382376
if "metadata" in response:
@@ -391,14 +385,15 @@ def rest_metadata(self):
391385
return self.rest(self.metadata, self.rest_only)
392386

393387
def x_goog_hash_header(self):
394-
header = ""
395-
if "x_emulator_crc32c" in self.metadata.metadata:
396-
header += "crc32c=" + self.metadata.metadata["x_emulator_crc32c"]
397-
if "x_emulator_md5" in self.metadata.metadata:
398-
if header != "":
399-
header += ","
400-
header += "md5=" + self.metadata.metadata["x_emulator_md5"]
401-
return header if header != "" else None
388+
hashes = []
389+
if self.metadata.crc32c is not None and self.metadata.crc32c.value is not None:
390+
hashes.append(
391+
"crc32c=%s"
392+
% utils.common.rest_crc32c_from_proto(self.metadata.crc32c.value)
393+
)
394+
if self.metadata.md5_hash is not None:
395+
hashes.append("md5=%s" % self.metadata.md5_hash)
396+
return ",".join(hashes) if len(hashes) != 0 else None
402397

403398
def rest_media(self, request):
404399
range_header = request.headers.get("range")

google/cloud/storage/emulator/grpc_server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ def StartResumableWrite(self, request, context):
253253
upload = gcs_type.holder.DataHolder.init_resumable_grpc(
254254
request, bucket, context
255255
)
256-
upload.metadata.metadata["x_emulator_upload"] = "resumable"
257256
db.insert_upload(upload)
258257
return storage_pb2.StartResumableWriteResponse(upload_id=upload.upload_id)
259258

google/cloud/storage/emulator/tests/test_holder.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,14 @@ def test_init_resumable_grpc(self):
195195
insert_object_spec=insert_object_spec, write_offset=0
196196
)
197197
upload = gcs.holder.DataHolder.init_resumable_grpc(request, bucket, "")
198+
# Verify the annotations inserted by the emulator.
199+
annotations = upload.metadata.metadata
200+
self.assertGreaterEqual(
201+
set(["x_emulator_upload", "x_emulator_crc32c", "x_emulator_md5"]),
202+
set(annotations.keys()),
203+
)
204+
# Clear any annotations created by the emulator
205+
upload.metadata.metadata.clear()
198206
self.assertEqual(
199207
upload.metadata, resources_pb2.Object(name="object", bucket="bucket")
200208
)
@@ -215,6 +223,30 @@ def test_init_resumable_grpc(self):
215223
projection = utils.common.extract_projection(upload.request, False, "")
216224
self.assertEqual(projection, CommonEnums.Projection.FULL)
217225

226+
def test_init_resumable_rest(self):
227+
request = storage_pb2.InsertBucketRequest(bucket={"name": "bucket"})
228+
bucket, _ = gcs.bucket.Bucket.init(request, "")
229+
bucket = bucket.metadata
230+
data = json.dumps(
231+
{
232+
# Empty payload checksums
233+
"crc32c": "AAAAAA==",
234+
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
235+
"name": "test-object-name",
236+
}
237+
)
238+
environ = create_environ(
239+
base_url="http://localhost:8080",
240+
content_length=len(data),
241+
data=data,
242+
content_type="application/json",
243+
method="POST",
244+
)
245+
upload = gcs.holder.DataHolder.init_resumable_rest(Request(environ), bucket)
246+
self.assertEqual(upload.metadata.name, "test-object-name")
247+
self.assertEqual(upload.metadata.crc32c.value, 0)
248+
self.assertEqual(upload.metadata.md5_hash, "1B2M2Y8AsgTpgAmY7PhCfg==")
249+
218250

219251
if __name__ == "__main__":
220252
unittest.main()

google/cloud/storage/emulator/utils/common.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
"""Common utils"""
1616

17+
import base64
1718
import json
1819
import random
1920
import re
2021
import types
21-
import time
2222
import sys
2323
import socket
2424
import struct
@@ -422,3 +422,54 @@ def handle_retry_test_instruction(database, request, method):
422422
database.dequeue_next_instruction(test_id, method)
423423
utils.error.notallowed()
424424
return __get_default_response_fn
425+
426+
427+
def rest_crc32c_to_proto(crc32c):
428+
"""Convert from the REST representation of crc32c checksums to the proto representation.
429+
430+
REST uses base64 encoded 32-bit big endian integers, while protos use just `int32`.
431+
"""
432+
return struct.unpack(">I", base64.b64decode(crc32c.encode("utf-8")))[0]
433+
434+
435+
def rest_crc32c_from_proto(crc32c):
436+
"""Convert from the gRPC/proto representation of crc32c checksums to the REST representation.
437+
438+
REST uses base64 encoded 32-bit big endian integers, while protos use just `int32`.
439+
"""
440+
return base64.b64encode(struct.pack(">I", crc32c)).decode("utf-8")
441+
442+
443+
def rest_adjust(data, adjustments):
444+
"""
445+
Apply a per-key 'actions' to a dictionary *if* the key is present.
446+
447+
When mapping between the gRPC and the REST representations of resources
448+
(Bucket, Object, etc.) we sometimes need to change the name and/or format
449+
of some fields.
450+
451+
The `adjustments` describes what keys (if present) need adjustment, and
452+
a function that returns the new key and value for the item in `data`.
453+
454+
Parameters
455+
----------
456+
data : dict
457+
A dictionary, typically the REST representation of a resource
458+
adjustments : dict
459+
The keys in `data` that, if present, need adjustment. The values
460+
in this dictionary are functions returning a (key, value) tuple
461+
that replaces the existing tuple in `data`.
462+
463+
Returns
464+
-------
465+
dict
466+
A copy of `data` with the changes prescribed by `adjustments`.
467+
"""
468+
modified = data.copy()
469+
for key, action in adjustments.items():
470+
value = modified.pop(key, None)
471+
if value is not None:
472+
k, v = action(value)
473+
if value is not None:
474+
modified[k] = v
475+
return modified

google/cloud/storage/internal/curl_client.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,9 @@ StatusOr<ObjectMetadata> CurlClient::InsertObjectMedia(
536536
// uploads. `DisableMD5Hash` and `DisableCrc32cChecksum` should not be
537537
// dependent on each other.
538538
if (!request.GetOption<DisableMD5Hash>().value() ||
539-
!request.GetOption<DisableCrc32cChecksum>().value_or(false)) {
539+
!request.GetOption<DisableCrc32cChecksum>().value_or(false) ||
540+
request.HasOption<MD5HashValue>() ||
541+
request.HasOption<Crc32cChecksumValue>()) {
540542
return InsertObjectMediaMultipart(request);
541543
}
542544

google/cloud/storage/testing/storage_integration_test.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,30 @@ class StorageIntegrationTest
105105
buckets_to_delete_.push_back(std::move(meta));
106106
}
107107

108+
struct ApiSwitch {
109+
Fields for_insert;
110+
IfMetagenerationNotMatch for_streaming_read;
111+
};
112+
113+
static ApiSwitch RestApiFlags(std::string const& api) {
114+
if (api == "XML") {
115+
return ApiSwitch{
116+
// enables XML: this filters-out all metadata fields from
117+
// the InsertObject() response. JSON and XML are equivalent when no
118+
// metadata fields are requested, and we default to XML in that case.
119+
Fields(""),
120+
// empty option has no effect, and the default is XML
121+
IfMetagenerationNotMatch()};
122+
}
123+
return ApiSwitch{
124+
// empty option has no effect, and the default is JSON since only JSON
125+
// can provide all metadata fields.
126+
Fields(),
127+
// disables XML (the default) as it does not support
128+
// metageneration-not-match
129+
IfMetagenerationNotMatch(0)};
130+
}
131+
108132
private:
109133
std::vector<ObjectMetadata> objects_to_delete_;
110134
std::vector<BucketMetadata> buckets_to_delete_;

0 commit comments

Comments
 (0)