1111from moto .logs .models import LogGroup as MotoLogGroup
1212from moto .logs .models import LogStream as MotoLogStream
1313
14- from localstack .aws .accounts import get_aws_account_id
1514from localstack .aws .api import CommonServiceException , RequestContext , handler
1615from localstack .aws .api .logs import (
1716 AmazonResourceName ,
4039from localstack .services .moto import call_moto
4140from localstack .services .plugins import ServiceLifecycleHook
4241from localstack .utils .aws import arns
43- from localstack .utils .aws .arns import extract_region_from_arn
42+ from localstack .utils .aws .client_types import ServicePrincipal
4443from localstack .utils .bootstrap import is_api_enabled
4544from localstack .utils .common import is_number
4645from localstack .utils .patch import patch
@@ -247,46 +246,61 @@ def moto_put_subscription_filter(fn, self, *args, **kwargs):
247246 role_arn = args [4 ]
248247
249248 log_group = self .groups .get (log_group_name )
249+ log_group_arn = arns .log_group_arn (log_group_name , self .account_id , self .region_name )
250250
251251 if not log_group :
252252 raise ResourceNotFoundException ("The specified log group does not exist." )
253253
254+ arn_data = arns .parse_arn (destination_arn )
255+
256+ if role_arn :
257+ factory = connect_to .with_assumed_role (
258+ role_arn = role_arn , service_principal = ServicePrincipal .logs
259+ )
260+ else :
261+ factory = connect_to (aws_access_key_id = arn_data ["account" ], region_name = arn_data ["region" ])
262+
254263 if ":lambda:" in destination_arn :
255- client = connect_to (region_name = extract_region_from_arn (destination_arn )).lambda_
256- lambda_name = arns .lambda_function_name (destination_arn )
264+ client = factory .lambda_ .request_metadata (
265+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
266+ )
257267 try :
258- client .get_function (FunctionName = lambda_name )
268+ client .get_function (FunctionName = destination_arn )
259269 except Exception :
260270 raise InvalidParameterException (
261271 "destinationArn for vendor lambda cannot be used with roleArn"
262272 )
263273
264274 elif ":kinesis:" in destination_arn :
265- client = connect_to ().kinesis
275+ client = factory .kinesis .request_metadata (
276+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
277+ )
266278 stream_name = arns .kinesis_stream_name (destination_arn )
267279 try :
280+ # Kinesis-Local DescribeStream does not support StreamArn param, so use StreamName instead
268281 client .describe_stream (StreamName = stream_name )
269282 except Exception :
270283 raise InvalidParameterException (
271- "Could not deliver test message to specified Kinesis stream. "
272- "Check if the given kinesis stream is in ACTIVE state. "
284+ "Could not deliver message to specified Kinesis stream. "
285+ "Ensure that the Kinesis stream exists and is ACTIVE. "
273286 )
274287
275288 elif ":firehose:" in destination_arn :
276- client = connect_to ().firehose
289+ client = factory .firehose .request_metadata (
290+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
291+ )
277292 firehose_name = arns .firehose_name (destination_arn )
278293 try :
279294 client .describe_delivery_stream (DeliveryStreamName = firehose_name )
280295 except Exception :
281296 raise InvalidParameterException (
282- "Could not deliver test message to specified Firehose stream. "
283- "Check if the given Firehose stream is in ACTIVE state ."
297+ "Could not deliver message to specified Firehose stream. "
298+ "Ensure that the Firehose stream exists and is ACTIVE ."
284299 )
285300
286301 else :
287- service = arns .extract_service_from_arn (destination_arn )
288302 raise InvalidParameterException (
289- f"PutSubscriptionFilter operation cannot work with destinationArn for vendor { service } "
303+ f"PutSubscriptionFilter operation cannot work with destinationArn for vendor { arn_data [ ' service' ] } "
290304 )
291305
292306 if filter_pattern :
@@ -329,7 +343,7 @@ def moto_put_log_events(self: "MotoLogStream", log_events):
329343
330344 data = {
331345 "messageType" : "DATA_MESSAGE" ,
332- "owner" : get_aws_account_id (),
346+ "owner" : self . account_id , # AWS Account ID of the originating log data
333347 "logGroup" : self .log_group .name ,
334348 "logStream" : self .log_stream_name ,
335349 "subscriptionFilters" : [subscription_filter .name ],
@@ -342,20 +356,39 @@ def moto_put_log_events(self: "MotoLogStream", log_events):
342356 payload_gz_encoded = output .getvalue ()
343357 event = {"awslogs" : {"data" : base64 .b64encode (output .getvalue ()).decode ("utf-8" )}}
344358
359+ log_group_arn = arns .log_group_arn (self .log_group .name , self .account_id , self .region )
360+ arn_data = arns .parse_arn (destination_arn )
361+
362+ if subscription_filter .role_arn :
363+ factory = connect_to .with_assumed_role (
364+ role_arn = subscription_filter .role_arn , service_principal = ServicePrincipal .logs
365+ )
366+ else :
367+ factory = connect_to (
368+ aws_access_key_id = arn_data ["account" ], region_name = arn_data ["region" ]
369+ )
370+
345371 if ":lambda:" in destination_arn :
346- client = connect_to (region_name = extract_region_from_arn (destination_arn )).lambda_
347- lambda_name = arns .lambda_function_name (destination_arn )
348- client .invoke (FunctionName = lambda_name , Payload = json .dumps (event ))
372+ client = factory .lambda_ .request_metadata (
373+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
374+ )
375+ client .invoke (FunctionName = destination_arn , Payload = json .dumps (event ))
376+
349377 if ":kinesis:" in destination_arn :
350- client = connect_to ().kinesis
378+ client = factory .kinesis .request_metadata (
379+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
380+ )
351381 stream_name = arns .kinesis_stream_name (destination_arn )
352382 client .put_record (
353383 StreamName = stream_name ,
354384 Data = payload_gz_encoded ,
355385 PartitionKey = self .log_group .name ,
356386 )
387+
357388 if ":firehose:" in destination_arn :
358- client = connect_to ().firehose
389+ client = factory .firehose .request_metadata (
390+ source_arn = log_group_arn , service_principal = ServicePrincipal .logs
391+ )
359392 firehose_name = arns .firehose_name (destination_arn )
360393 client .put_record (
361394 DeliveryStreamName = firehose_name ,
0 commit comments