-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmocks3.py
263 lines (212 loc) · 9.36 KB
/
mocks3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
""" A simple mock for Boto3's S3 modules. """
import copy
import io
import pathlib
class PseudoBotoPaginator:
def __init__(self, func):
self.func = func
def paginate(self, **kwargs):
yield from self.func(**kwargs)
class PseudoS3Exceptions:
try:
from botocore.exceptions import NoSuchKey
except ImportError:
class NoSuchKey(BaseException):
def __init__(self, key):
self.key = key
def __repr__(self):
return f'PseudoNoSuchKey({self.key!r})'
try:
from botocore.exceptions import ClientError
except ImportError:
class ClientError(BaseException):
def __init__(self, error_response, operation_name):
self.response = error_response # Not error_reponse!
self.operation_name = operation_name
def __repr__(self):
return f'ClientError({repr(str(self))})'
def __str__(self):
error_code = self.response['Error']['Code']
error_message = self.response['Error']['Message']
return (
f'An error occurred ({error_code}) when calling the {self.operation_name} '
f'operation: {error_message}')
class PseudoBotoIO:
# boto3's object file handle is EXTREMELY restricted. Simulate that.
def __init__(self, content: bytes):
assert isinstance(content, bytes)
self._io = io.BytesIO(content)
def read(self, *args, **kwargs):
return self._io.read(*args, **kwargs)
def close(self):
return self._io.close()
# No other properties!
class PseudoS3Client:
""" Simulates a boto3 S3 client object in tests """
exceptions = PseudoS3Exceptions
def __init__(self, *, origin_client=None, init_buckets=None):
self.origin_client = origin_client
self.buckets = {}
if init_buckets:
for bucket_name in init_buckets:
self.buckets[bucket_name] = {}
# non-standard variable names for Boto3 compatibility
def download_file(self, Bucket, Key, Filename, *, ExtraArgs=None, Callback=None, Config=None):
bucket = self.buckets[Bucket]
if Key not in bucket:
raise self.exceptions.NoSuchKey(Key)
content = bucket[Key]
with open(Filename, 'wb') as f:
f.write(content)
if Callback:
Callback(len(content))
# non-standard variable names for Boto3 compatibility
def download_fileobj(self, Bucket, Key, Fileobj, *, ExtraArgs=None, Callback=None, Config=None):
bucket = self.buckets[Bucket]
storage = getattr(bucket, 'bucket_storage', bucket)
if Key not in storage:
raise self.exceptions.NoSuchKey(Key)
content = storage[Key]
Fileobj.write(content)
if Callback:
Callback(len(content))
# non-standard variable names for Boto3 compatibility
def get_object(self, *, Bucket, Key):
bucket = self.buckets[Bucket]
if Key not in bucket:
raise self.exceptions.NoSuchKey(Key)
content = bucket[Key]
return {
'Body': PseudoBotoIO(content),
'ContentLength': len(content),
}
# noqa non-standard variable names from https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object
def head_object(self, Bucket, Key):
try:
bucket = self.buckets[Bucket]
content = bucket[Key]
except KeyError as err:
raise self.exceptions.ClientError(
error_response={
'Error': {
'Code': '404',
# Real message is only: 'Not Found'
'Message': f'Not Found (PseudoS3Client KeyError: {err})'
}
},
operation_name='HeadObject',
)
# File found: return a halfway plausible result ;)
return {
'AcceptRanges': 'bytes',
'ContentLength': len(content),
'ContentType': 'application/octet-stream',
'ETag': '"00000000000000000000000000000000"',
'LastModified': None,
'Metadata': {},
'ResponseMetadata': {}
}
# noqa non-standard variable names from https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_fileobj
def upload_fileobj(self, Fileobj, Bucket, Key, *, ExtraArgs=None, Callback=None, Config=None):
if Callback:
Callback(0)
Callback(1)
bucket = self.buckets[Bucket]
contents = Fileobj.read()
assert isinstance(bucket, dict)
bucket[Key] = contents
if Callback:
Callback(len(contents))
# boto3 closes file objects, see:
# https://github.com/boto/s3transfer/issues/80
Fileobj.close()
# noqa non-standard variable names from https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file
def upload_file(self, Filename, Bucket, Key, *, ExtraArgs=None, Callback=None, Config=None):
contents = pathlib.Path(Filename).read_bytes()
buf = io.BytesIO(contents)
self.upload_fileobj(
Fileobj=buf, Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs, Callback=Callback,
Config=Config)
# noqa non-standard variable names from https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_object
def delete_object(self, Bucket, Key):
del self.buckets[Bucket][Key]
# non-standard variable names for Boto3 compatibility
def copy_object(self, *, Bucket, CopySource, Key, ContentDisposition=None, MetadataDirective='COPY'):
bucket = self.buckets[Bucket]
src_bucket_name, _, src_key = CopySource.partition('/')
src_bucket = self.buckets[src_bucket_name]
if src_key not in src_bucket:
raise self.exceptions.NoSuchKey(src_key)
bucket[Key] = copy.deepcopy(src_bucket[src_key])
def generate_presigned_url(self, *args, **kwargs):
return self.origin_client.generate_presigned_url(*args, **kwargs)
def list_buckets(self):
return {
'Buckets': [{'Name': bucket_name} for bucket_name in self.buckets.keys()],
# Add some "realistic" fake meta data:
'Owner': {'DisplayName': '', 'ID': 'b36c0000000000000000000000000000000000000000000000000000abcd0005'},
'ResponseMetadata': {
'HTTPHeaders': {'x-amz-request-id': 'PseudoS3Client'},
'HTTPStatusCode': 200,
'RequestId': 'PseudoS3Client',
},
}
def head_bucket(self, Bucket, ExpectedBucketOwner=None):
if ExpectedBucketOwner is not None:
raise NotImplementedError
if Bucket not in self.buckets:
raise self.exceptions.NoSuchKey(Bucket)
return { # Some "realistic" fake meta data:
'ResponseMetadata': {
'HTTPHeaders': {'x-amz-request-id': 'PseudoS3Client'},
'HTTPStatusCode': 200,
'RequestId': 'PseudoS3Client',
}
}
def _list_objects_v2(self, Bucket, Prefix=None):
bucket = self.buckets.get(Bucket, {})
keys = bucket.keys()
if Prefix is not None:
keys = (k for k in keys if k.startswith(Prefix))
keys = sorted(keys)
yield {
'Contents': [{'Key': k, 'Size': len(bucket[k])} for k in keys],
}
def get_paginator(self, operation_name):
assert operation_name == 'list_objects_v2', f'Unsupported operation name {operation_name}'
return PseudoBotoPaginator(self._list_objects_v2)
# Non-standard functions, prefixed by "mock_" (can be used in test code, but not main code)
# or "debug_" (should never be committed)
def mock_set_content(self, Bucket, Key, content: bytes):
assert isinstance(content, bytes)
self.buckets[Bucket][Key] = content
def mock_get_content(self, Bucket, Key):
return self.buckets[Bucket][Key]
def mock_list_files(self, Bucket):
return sorted(self.buckets[Bucket].keys())
def debug_long_repr(self, max_string_length=20):
res = '<MockS3\n'
for bucket_name, objects in sorted(self.buckets.items()):
res += f'{bucket_name}\n'
for key, content in sorted(objects.items()):
try:
content_str = content.decode('utf-8')
if len(content_str) > max_string_length:
content_repr = repr(content_str[:max_string_length - 3])[:-1] + '...'
else:
content_repr = repr(content_str)
except ValueError:
# Binary content
content_repr = repr(content[:max_string_length])
if len(content_repr) > max_string_length:
content_repr = content_repr[:max_string_length - 3][:-1] + '...'
if content == b'':
content_repr = ''
res += f' {key:20} => [{len(content_repr)} bytes] {content_repr}\n'
res += '>\n'
return res
def debug_print(self):
print(self.debug_long_repr())
class PseudoBoto3:
def __init__(self, **kwargs):
self.client = PseudoS3Client(**kwargs)