1+ import json
12import logging
23import os
4+ import re
35import time
46from random import random
57
810from localstack .aws .api .kinesis import (
911 ConsumerARN ,
1012 Data ,
13+ GetResourcePolicyOutput ,
1114 HashKey ,
1215 KinesisApi ,
1316 PartitionKey ,
17+ Policy ,
1418 ProvisionedThroughputExceededException ,
1519 PutRecordOutput ,
1620 PutRecordsOutput ,
1721 PutRecordsRequestEntryList ,
1822 PutRecordsResultEntry ,
23+ ResourceARN ,
24+ ResourceNotFoundException ,
1925 SequenceNumber ,
2026 ShardId ,
2127 StartingPosition ,
2430 SubscribeToShardEvent ,
2531 SubscribeToShardEventStream ,
2632 SubscribeToShardOutput ,
33+ ValidationException ,
2734)
2835from localstack .aws .connect import connect_to
2936from localstack .constants import LOCALHOST
3946MAX_SUBSCRIPTION_SECONDS = 300
4047SERVER_STARTUP_TIMEOUT = 120
4148
49+ DATA_STREAM_ARN_REGEX = re .compile (
50+ r"^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[a-zA-Z0-9_.\-]+$"
51+ )
52+ CONSUMER_ARN_REGEX = re .compile (
53+ r"^arn:aws(?:-[a-z]+)*:kinesis:[a-z0-9-]+:\d{12}:stream\/[a-zA-Z0-9_.\-]+\/consumer\/[a-zA-Z0-9_.\-]+:\d+$"
54+ )
55+
4256
4357def find_stream_for_consumer (consumer_arn ):
4458 account_id = extract_account_id_from_arn (consumer_arn )
@@ -52,6 +66,11 @@ def find_stream_for_consumer(consumer_arn):
5266 raise Exception (f"Unable to find stream for stream consumer { consumer_arn } " )
5367
5468
69+ def is_valid_kinesis_arn (resource_arn : ResourceARN ) -> bool :
70+ """Check if the provided ARN is a valid Kinesis ARN."""
71+ return bool (CONSUMER_ARN_REGEX .match (resource_arn ) or DATA_STREAM_ARN_REGEX .match (resource_arn ))
72+
73+
5574class KinesisProvider (KinesisApi , ServiceLifecycleHook ):
5675 server_manager : KinesisServerManager
5776
@@ -81,6 +100,60 @@ def get_forward_url(self, account_id: str, region_name: str) -> str:
81100 def get_store (account_id : str , region_name : str ) -> KinesisStore :
82101 return kinesis_stores [account_id ][region_name ]
83102
103+ def put_resource_policy (
104+ self ,
105+ context : RequestContext ,
106+ resource_arn : ResourceARN ,
107+ policy : Policy ,
108+ ** kwargs ,
109+ ) -> None :
110+ if not is_valid_kinesis_arn (resource_arn ):
111+ raise ValidationException (f"invalid kinesis arn { resource_arn } " )
112+
113+ kinesis = connect_to ().kinesis
114+ try :
115+ kinesis .describe_stream_summary (StreamARN = resource_arn )
116+ except kinesis .exceptions .ResourceNotFoundException :
117+ raise ResourceNotFoundException (f"Stream with ARN { resource_arn } not found" )
118+
119+ store = self .get_store (context .account_id , context .region )
120+ store .resource_policies [resource_arn ] = policy
121+
122+ def get_resource_policy (
123+ self ,
124+ context : RequestContext ,
125+ resource_arn : ResourceARN ,
126+ ** kwargs ,
127+ ) -> GetResourcePolicyOutput :
128+ if not is_valid_kinesis_arn (resource_arn ):
129+ raise ValidationException (f"invalid kinesis arn { resource_arn } " )
130+
131+ kinesis = connect_to ().kinesis
132+ try :
133+ kinesis .describe_stream_summary (StreamARN = resource_arn )
134+ except kinesis .exceptions .ResourceNotFoundException :
135+ raise ResourceNotFoundException (f"Stream with ARN { resource_arn } not found" )
136+
137+ store = self .get_store (context .account_id , context .region )
138+ policy = store .resource_policies .get (resource_arn , json .dumps ({}))
139+ return GetResourcePolicyOutput (Policy = policy )
140+
141+ def delete_resource_policy (
142+ self ,
143+ context : RequestContext ,
144+ resource_arn : ResourceARN ,
145+ ** kwargs ,
146+ ) -> None :
147+ if not is_valid_kinesis_arn (resource_arn ):
148+ raise ValidationException (f"invalid kinesis arn { resource_arn } " )
149+
150+ store = self .get_store (context .account_id , context .region )
151+ if resource_arn not in store .resource_policies :
152+ raise ResourceNotFoundException (
153+ f"No resource policy found for resource ARN { resource_arn } "
154+ )
155+ del store .resource_policies [resource_arn ]
156+
84157 def subscribe_to_shard (
85158 self ,
86159 context : RequestContext ,
0 commit comments