Skip to content

Commit 177773b

Browse files
Fix broken multi-account/multi-region functionality (#9541)
1 parent d099d08 commit 177773b

File tree

10 files changed

+108
-44
lines changed

10 files changed

+108
-44
lines changed

localstack/services/kinesis/provider.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def put_record(
151151
sequence_number_for_ordering: SequenceNumber = None,
152152
stream_arn: StreamARN = None,
153153
) -> PutRecordOutput:
154+
# TODO: Ensure use of `stream_arn` works. Currently kinesis-mock only works with ctx request account ID and region
154155
if random() < config.KINESIS_ERROR_PROBABILITY:
155156
raise ProvisionedThroughputExceededException(
156157
"Rate exceeded for shard X in stream Y under account Z."
@@ -166,6 +167,7 @@ def put_records(
166167
stream_name: StreamName = None,
167168
stream_arn: StreamARN = None,
168169
) -> PutRecordsOutput:
170+
# TODO: Ensure use of `stream_arn` works. Currently kinesis-mock only works with ctx request account ID and region
169171
if random() < config.KINESIS_ERROR_PROBABILITY:
170172
records_count = len(records) if records is not None else 0
171173
records = [

localstack/services/logs/provider.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from moto.logs.models import LogGroup as MotoLogGroup
1212
from moto.logs.models import LogStream as MotoLogStream
1313

14-
from localstack.aws.accounts import get_aws_account_id
1514
from localstack.aws.api import CommonServiceException, RequestContext, handler
1615
from localstack.aws.api.logs import (
1716
AmazonResourceName,
@@ -40,7 +39,7 @@
4039
from localstack.services.moto import call_moto
4140
from localstack.services.plugins import ServiceLifecycleHook
4241
from localstack.utils.aws import arns
43-
from localstack.utils.aws.arns import extract_region_from_arn
42+
from localstack.utils.aws.client_types import ServicePrincipal
4443
from localstack.utils.bootstrap import is_api_enabled
4544
from localstack.utils.common import is_number
4645
from localstack.utils.patch import patch
@@ -247,46 +246,61 @@ def moto_put_subscription_filter(fn, self, *args, **kwargs):
247246
role_arn = args[4]
248247

249248
log_group = self.groups.get(log_group_name)
249+
log_group_arn = arns.log_group_arn(log_group_name, self.account_id, self.region_name)
250250

251251
if not log_group:
252252
raise ResourceNotFoundException("The specified log group does not exist.")
253253

254+
arn_data = arns.parse_arn(destination_arn)
255+
256+
if role_arn:
257+
factory = connect_to.with_assumed_role(
258+
role_arn=role_arn, service_principal=ServicePrincipal.logs
259+
)
260+
else:
261+
factory = connect_to(aws_access_key_id=arn_data["account"], region_name=arn_data["region"])
262+
254263
if ":lambda:" in destination_arn:
255-
client = connect_to(region_name=extract_region_from_arn(destination_arn)).lambda_
256-
lambda_name = arns.lambda_function_name(destination_arn)
264+
client = factory.lambda_.request_metadata(
265+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
266+
)
257267
try:
258-
client.get_function(FunctionName=lambda_name)
268+
client.get_function(FunctionName=destination_arn)
259269
except Exception:
260270
raise InvalidParameterException(
261271
"destinationArn for vendor lambda cannot be used with roleArn"
262272
)
263273

264274
elif ":kinesis:" in destination_arn:
265-
client = connect_to().kinesis
275+
client = factory.kinesis.request_metadata(
276+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
277+
)
266278
stream_name = arns.kinesis_stream_name(destination_arn)
267279
try:
280+
# Kinesis-Local DescribeStream does not support StreamArn param, so use StreamName instead
268281
client.describe_stream(StreamName=stream_name)
269282
except Exception:
270283
raise InvalidParameterException(
271-
"Could not deliver test message to specified Kinesis stream. "
272-
"Check if the given kinesis stream is in ACTIVE state. "
284+
"Could not deliver message to specified Kinesis stream. "
285+
"Ensure that the Kinesis stream exists and is ACTIVE."
273286
)
274287

275288
elif ":firehose:" in destination_arn:
276-
client = connect_to().firehose
289+
client = factory.firehose.request_metadata(
290+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
291+
)
277292
firehose_name = arns.firehose_name(destination_arn)
278293
try:
279294
client.describe_delivery_stream(DeliveryStreamName=firehose_name)
280295
except Exception:
281296
raise InvalidParameterException(
282-
"Could not deliver test message to specified Firehose stream. "
283-
"Check if the given Firehose stream is in ACTIVE state."
297+
"Could not deliver message to specified Firehose stream. "
298+
"Ensure that the Firehose stream exists and is ACTIVE."
284299
)
285300

286301
else:
287-
service = arns.extract_service_from_arn(destination_arn)
288302
raise InvalidParameterException(
289-
f"PutSubscriptionFilter operation cannot work with destinationArn for vendor {service}"
303+
f"PutSubscriptionFilter operation cannot work with destinationArn for vendor {arn_data['service']}"
290304
)
291305

292306
if filter_pattern:
@@ -329,7 +343,7 @@ def moto_put_log_events(self: "MotoLogStream", log_events):
329343

330344
data = {
331345
"messageType": "DATA_MESSAGE",
332-
"owner": get_aws_account_id(),
346+
"owner": self.account_id, # AWS Account ID of the originating log data
333347
"logGroup": self.log_group.name,
334348
"logStream": self.log_stream_name,
335349
"subscriptionFilters": [subscription_filter.name],
@@ -342,20 +356,39 @@ def moto_put_log_events(self: "MotoLogStream", log_events):
342356
payload_gz_encoded = output.getvalue()
343357
event = {"awslogs": {"data": base64.b64encode(output.getvalue()).decode("utf-8")}}
344358

359+
log_group_arn = arns.log_group_arn(self.log_group.name, self.account_id, self.region)
360+
arn_data = arns.parse_arn(destination_arn)
361+
362+
if subscription_filter.role_arn:
363+
factory = connect_to.with_assumed_role(
364+
role_arn=subscription_filter.role_arn, service_principal=ServicePrincipal.logs
365+
)
366+
else:
367+
factory = connect_to(
368+
aws_access_key_id=arn_data["account"], region_name=arn_data["region"]
369+
)
370+
345371
if ":lambda:" in destination_arn:
346-
client = connect_to(region_name=extract_region_from_arn(destination_arn)).lambda_
347-
lambda_name = arns.lambda_function_name(destination_arn)
348-
client.invoke(FunctionName=lambda_name, Payload=json.dumps(event))
372+
client = factory.lambda_.request_metadata(
373+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
374+
)
375+
client.invoke(FunctionName=destination_arn, Payload=json.dumps(event))
376+
349377
if ":kinesis:" in destination_arn:
350-
client = connect_to().kinesis
378+
client = factory.kinesis.request_metadata(
379+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
380+
)
351381
stream_name = arns.kinesis_stream_name(destination_arn)
352382
client.put_record(
353383
StreamName=stream_name,
354384
Data=payload_gz_encoded,
355385
PartitionKey=self.log_group.name,
356386
)
387+
357388
if ":firehose:" in destination_arn:
358-
client = connect_to().firehose
389+
client = factory.firehose.request_metadata(
390+
source_arn=log_group_arn, service_principal=ServicePrincipal.logs
391+
)
359392
firehose_name = arns.firehose_name(destination_arn)
360393
client.put_record(
361394
DeliveryStreamName=firehose_name,

localstack/utils/aws/client_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ class ServicePrincipal(str):
240240
events = "events"
241241
firehose = "firehose"
242242
lambda_ = "lambda"
243+
logs = "logs"
243244
s3 = "s3"
244245
sns = "sns"
245246
sqs = "sqs"

tests/aws/services/dynamodb/test_dynamodb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from localstack.testing.pytest import markers
1717
from localstack.testing.snapshots.transformer import SortingTransformer
1818
from localstack.utils import testutil
19-
from localstack.utils.aws import arns, aws_stack, queries, resources
19+
from localstack.utils.aws import arns, queries, resources
2020
from localstack.utils.aws.resources import create_dynamodb_table
2121
from localstack.utils.common import json_safe, long_uid, retry, short_uid
2222
from localstack.utils.sync import poll_condition, wait_until
@@ -218,7 +218,7 @@ def test_stream_spec_and_region_replacement(self, aws_client):
218218
table = aws_client.dynamodb.describe_table(TableName=table_name)["Table"]
219219

220220
# assert ARN formats
221-
expected_arn_prefix = "arn:aws:dynamodb:" + aws_stack.get_local_region()
221+
expected_arn_prefix = "arn:aws:dynamodb:" + TEST_AWS_REGION_NAME
222222
assert table["TableArn"].startswith(expected_arn_prefix)
223223
assert table["LatestStreamArn"].startswith(expected_arn_prefix)
224224

tests/aws/services/logs/test_logs.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pytest
77

88
from localstack.aws.api.lambda_ import Runtime
9-
from localstack.constants import APPLICATION_AMZ_JSON_1_1, AWS_REGION_US_EAST_1
9+
from localstack.constants import APPLICATION_AMZ_JSON_1_1, TEST_AWS_REGION_NAME
1010
from localstack.testing.pytest import markers
1111
from localstack.testing.snapshots.transformer import KeyValueBasedTransformer
1212
from localstack.utils import testutil
@@ -17,7 +17,7 @@
1717
logs_role = {
1818
"Statement": {
1919
"Effect": "Allow",
20-
"Principal": {"Service": f"logs.{AWS_REGION_US_EAST_1}.amazonaws.com"},
20+
"Principal": {"Service": f"logs.{TEST_AWS_REGION_NAME}.amazonaws.com"},
2121
"Action": "sts:AssumeRole",
2222
}
2323
}
@@ -206,7 +206,7 @@ def test_create_and_delete_log_stream(self, logs_log_group, aws_client, snapshot
206206
logGroupIdentifier=arns.log_group_arn(
207207
logs_log_group,
208208
account_id=aws_client.sts.get_caller_identity()["Account"],
209-
region_name=AWS_REGION_US_EAST_1,
209+
region_name=TEST_AWS_REGION_NAME,
210210
)
211211
).get("logStreams")
212212
snapshot.match("log_group_identifier-arn", response)
@@ -287,9 +287,9 @@ def test_put_subscription_filter_lambda(
287287
result = aws_client.lambda_.add_permission(
288288
FunctionName=test_lambda_name,
289289
StatementId=test_lambda_name,
290-
Principal=f"logs.{AWS_REGION_US_EAST_1}.amazonaws.com",
290+
Principal=f"logs.{TEST_AWS_REGION_NAME}.amazonaws.com",
291291
Action="lambda:InvokeFunction",
292-
SourceArn=f"arn:aws:logs:{AWS_REGION_US_EAST_1}:{account_id}:log-group:{logs_log_group}:*",
292+
SourceArn=f"arn:aws:logs:{TEST_AWS_REGION_NAME}:{account_id}:log-group:{logs_log_group}:*",
293293
SourceAccount=account_id,
294294
)
295295

tests/aws/services/s3/test_s3.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from localstack.testing.snapshots.transformer_utility import TransformerUtility
5656
from localstack.utils import testutil
5757
from localstack.utils.aws import aws_stack
58+
from localstack.utils.aws.resources import create_s3_bucket
5859
from localstack.utils.files import load_file
5960
from localstack.utils.run import run
6061
from localstack.utils.server import http2_server
@@ -5446,31 +5447,33 @@ def secondary_client(self, secondary_aws_client):
54465447
return secondary_aws_client.s3
54475448

54485449
@markers.aws.unknown
5449-
def test_shared_bucket_namespace(self, primary_client, secondary_client):
5450+
def test_shared_bucket_namespace(self, primary_client, secondary_client, cleanups):
5451+
bucket_name = short_uid()
5452+
54505453
# Ensure that the bucket name space is shared by all accounts and regions
5451-
primary_client.create_bucket(Bucket="foo")
5454+
create_s3_bucket(bucket_name=bucket_name, s3_client=primary_client)
5455+
cleanups.append(lambda: primary_client.delete_bucket(Bucket=bucket_name))
54525456

54535457
with pytest.raises(ClientError) as exc:
5454-
secondary_client.create_bucket(
5455-
Bucket="foo",
5456-
CreateBucketConfiguration={"LocationConstraint": SECONDARY_TEST_AWS_REGION_NAME},
5457-
)
5458+
create_s3_bucket(bucket_name=bucket_name, s3_client=secondary_client)
54585459
exc.match("BucketAlreadyExists")
54595460

54605461
@markers.aws.unknown
5461-
def test_cross_account_access(self, primary_client, secondary_client):
5462+
def test_cross_account_access(self, primary_client, secondary_client, cleanups):
54625463
# Ensure that following operations can be performed across accounts
54635464
# - ListObjects
54645465
# - PutObject
54655466
# - GetObject
54665467

5467-
bucket_name = "foo"
5468+
bucket_name = short_uid()
54685469
key_name = "lorem/ipsum"
54695470
body1 = b"zaphod beeblebrox"
54705471
body2 = b"42"
54715472

54725473
# First user creates a bucket and puts an object
5473-
primary_client.create_bucket(Bucket=bucket_name)
5474+
create_s3_bucket(bucket_name=bucket_name, s3_client=primary_client)
5475+
cleanups.append(lambda: primary_client.delete_bucket(Bucket=bucket_name))
5476+
54745477
response = primary_client.list_buckets()
54755478
assert bucket_name in [bucket["Name"] for bucket in response["Buckets"]]
54765479
primary_client.put_object(Bucket=bucket_name, Key=key_name, Body=body1)

tests/aws/services/sqs/test_sqs.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from localstack import config
1313
from localstack.aws.api.lambda_ import Runtime
1414
from localstack.constants import (
15-
DEFAULT_AWS_ACCOUNT_ID,
1615
SECONDARY_TEST_AWS_ACCESS_KEY_ID,
1716
SECONDARY_TEST_AWS_ACCOUNT_ID,
1817
SECONDARY_TEST_AWS_SECRET_ACCESS_KEY,
@@ -3771,7 +3770,12 @@ def test_get_queue_attributes_all(self, sqs_create_queue, sqs_http_client):
37713770
assert queue_url.split("/")[-1] in response.text
37723771

37733772
@markers.aws.only_localstack
3774-
def test_get_queue_attributes_works_without_authparams(self, sqs_create_queue):
3773+
@pytest.mark.parametrize("strategy", ["standard", "domain", "path"])
3774+
def test_get_queue_attributes_works_without_authparams(
3775+
self, monkeypatch, sqs_create_queue, strategy
3776+
):
3777+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
3778+
37753779
queue_url = sqs_create_queue()
37763780
response = requests.get(
37773781
queue_url,
@@ -4021,6 +4025,7 @@ def test_get_queue_url_works_for_same_queue(
40214025
params={
40224026
"Action": "GetQueueUrl",
40234027
"QueueName": queue_url.split("/")[-1],
4028+
"QueueOwnerAWSAccountId": TEST_AWS_ACCOUNT_ID,
40244029
},
40254030
)
40264031
assert f"<QueueUrl>{queue_url}</QueueUrl>" in response.text
@@ -4042,6 +4047,7 @@ def test_get_queue_url_work_for_different_queue(
40424047
params={
40434048
"Action": "GetQueueUrl",
40444049
"QueueName": queue2_url.split("/")[-1],
4050+
"QueueOwnerAWSAccountId": TEST_AWS_ACCOUNT_ID,
40454051
},
40464052
)
40474053
assert f"<QueueUrl>{queue2_url}</QueueUrl>" in response.text
@@ -4254,7 +4260,7 @@ def test_cross_account_get_queue_url(
42544260
queue_name = f"test-queue-cross-account-{short_uid()}"
42554261
queue_url = sqs_create_queue(QueueName=queue_name)
42564262
account_id, region_name, queue_name_from_url = parse_queue_url(queue_url)
4257-
assert account_id == DEFAULT_AWS_ACCOUNT_ID
4263+
assert account_id == TEST_AWS_ACCOUNT_ID
42584264
assert region_name == TEST_AWS_REGION_NAME
42594265
assert queue_name_from_url == queue_name
42604266

tests/aws/services/sqs/test_sqs_backdoor.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ def test_list_messages_as_botocore_endpoint_url(
9494
def test_fifo_list_messages_as_botocore_endpoint_url(
9595
self, sqs_create_queue, aws_client, aws_client_factory, monkeypatch, strategy
9696
):
97+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
98+
9799
queue_url = sqs_create_queue(
98100
QueueName=f"queue-{short_uid()}.fifo",
99101
Attributes={
@@ -130,6 +132,8 @@ def test_fifo_list_messages_as_botocore_endpoint_url(
130132
def test_list_messages_with_invalid_action_raises_error(
131133
self, sqs_create_queue, aws_client_factory, monkeypatch, strategy
132134
):
135+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
136+
133137
queue_url = sqs_create_queue()
134138

135139
client = aws_client_factory(
@@ -183,6 +187,8 @@ def test_list_messages_as_json(self, sqs_create_queue, monkeypatch, aws_client,
183187
def test_list_messages_with_invisible_messages(
184188
self, sqs_create_queue, aws_client, monkeypatch, strategy
185189
):
190+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
191+
186192
queue_url = sqs_create_queue()
187193

188194
aws_client.sqs.send_message(QueueUrl=queue_url, MessageBody="message-1")
@@ -224,6 +230,8 @@ def test_list_messages_with_invisible_messages(
224230
def test_list_messages_with_delayed_messages(
225231
self, sqs_create_queue, aws_client, monkeypatch, strategy
226232
):
233+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
234+
227235
queue_url = sqs_create_queue()
228236

229237
aws_client.sqs.send_message(QueueUrl=queue_url, MessageBody="message-1")
@@ -259,6 +267,8 @@ def test_list_messages_with_delayed_messages(
259267
@markers.aws.only_localstack
260268
@pytest.mark.parametrize("strategy", ["standard", "domain", "path"])
261269
def test_list_messages_without_queue_url(self, aws_client, monkeypatch, strategy):
270+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
271+
262272
# makes sure the service is loaded when running the test individually
263273
aws_client.sqs.list_queues()
264274

@@ -275,6 +285,8 @@ def test_list_messages_without_queue_url(self, aws_client, monkeypatch, strategy
275285
@markers.aws.only_localstack
276286
@pytest.mark.parametrize("strategy", ["standard", "domain", "path"])
277287
def test_list_messages_with_invalid_queue_url(self, aws_client, monkeypatch, strategy):
288+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
289+
278290
# makes sure the service is loaded when running the test individually
279291
aws_client.sqs.list_queues()
280292

@@ -289,6 +301,8 @@ def test_list_messages_with_invalid_queue_url(self, aws_client, monkeypatch, str
289301
@markers.aws.only_localstack
290302
@pytest.mark.parametrize("strategy", ["standard", "domain", "path"])
291303
def test_list_messages_with_non_existent_queue(self, aws_client, monkeypatch, strategy):
304+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
305+
292306
# makes sure the service is loaded when running the test individually
293307
aws_client.sqs.list_queues()
294308

@@ -316,6 +330,8 @@ def test_list_messages_with_non_existent_queue(self, aws_client, monkeypatch, st
316330
def test_list_messages_with_queue_url_in_path(
317331
self, sqs_create_queue, aws_client, monkeypatch, strategy
318332
):
333+
monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)
334+
319335
queue_url = sqs_create_queue()
320336

321337
aws_client.sqs.send_message(QueueUrl=queue_url, MessageBody="message-1")

0 commit comments

Comments
 (0)