Skip to content

Commit dcc4a7b

Browse files
authored
refactor kinesis connector (#9644)
1 parent 4399dbe commit dcc4a7b

File tree

12 files changed

+471
-832
lines changed

12 files changed

+471
-832
lines changed

localstack/services/cloudformation/resource_provider.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"AWS::KMS::Key": "ResourceProvider",
106106
"AWS::Kinesis::Stream": "ResourceProvider",
107107
"AWS::Kinesis::StreamConsumer": "ResourceProvider",
108+
"AWS::KinesisAnalytics::Application": "ResourceProvider",
108109
"AWS::KinesisFirehose::DeliveryStream": "ResourceProvider",
109110
"AWS::Lambda::Alias": "ResourceProvider",
110111
"AWS::Logs::LogGroup": "ResourceProvider",

localstack/services/firehose/provider.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,13 +305,16 @@ def create_delivery_stream(
305305
def _startup():
306306
stream["DeliveryStreamStatus"] = DeliveryStreamStatus.CREATING
307307
try:
308+
listener_function = functools.partial(
309+
self._process_records,
310+
context.account_id,
311+
context.region,
312+
delivery_stream_name,
313+
)
308314
process = kinesis_connector.listen_to_kinesis(
309315
stream_name=kinesis_stream_name,
310316
region_name=context.region,
311-
fh_d_stream=delivery_stream_name,
312-
listener_func=functools.partial(
313-
self._process_records, context.account_id, context.region
314-
),
317+
listener_func=listener_function,
315318
wait_until_started=True,
316319
ddb_lease_table_suffix="-firehose",
317320
)
@@ -514,9 +517,8 @@ def _process_records(
514517
self,
515518
account_id: str,
516519
region_name: str,
517-
records: List[Record],
518-
shard_id: str,
519520
fh_d_stream: str,
521+
records: List[Record],
520522
):
521523
"""Process the given records from the underlying Kinesis stream"""
522524
return self._put_records(account_id, region_name, fh_d_stream, records)

localstack/utils/aws/aws_stack.py

Lines changed: 1 addition & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import json
21
import logging
3-
import os
42
import re
53
import socket
64
from functools import lru_cache
@@ -16,12 +14,10 @@
1614
APPLICATION_AMZ_JSON_1_1,
1715
APPLICATION_X_WWW_FORM_URLENCODED,
1816
AWS_REGION_US_EAST_1,
19-
ENV_DEV,
2017
HEADER_LOCALSTACK_ACCOUNT_ID,
2118
LOCALHOST,
22-
REGION_LOCAL,
2319
)
24-
from localstack.utils.strings import is_string, is_string_or_bytes, to_str
20+
from localstack.utils.strings import is_string_or_bytes, to_str
2521

2622
# set up logger
2723
LOG = logging.getLogger(__name__)
@@ -54,83 +50,6 @@ def get_valid_regions_for_service(service_name):
5450
return regions
5551

5652

57-
# TODO: Remove, this is super legacy functionality
58-
class Environment:
59-
def __init__(self, region=None, prefix=None):
60-
# target is the runtime environment to use, e.g.,
61-
# 'local' for local mode
62-
self.region = region or get_local_region()
63-
# prefix can be 'prod', 'stg', 'uat-1', etc.
64-
self.prefix = prefix
65-
66-
def apply_json(self, j):
67-
if isinstance(j, str):
68-
j = json.loads(j)
69-
self.__dict__.update(j)
70-
71-
@staticmethod
72-
def from_string(s):
73-
parts = s.split(":")
74-
if len(parts) == 1:
75-
if s in PREDEFINED_ENVIRONMENTS:
76-
return PREDEFINED_ENVIRONMENTS[s]
77-
parts = [get_local_region(), s]
78-
if len(parts) > 2:
79-
raise Exception('Invalid environment string "%s"' % s)
80-
region = parts[0]
81-
prefix = parts[1]
82-
return Environment(region=region, prefix=prefix)
83-
84-
@staticmethod
85-
def from_json(j):
86-
if not isinstance(j, dict):
87-
j = j.to_dict()
88-
result = Environment()
89-
result.apply_json(j)
90-
return result
91-
92-
def __str__(self):
93-
return "%s:%s" % (self.region, self.prefix)
94-
95-
96-
PREDEFINED_ENVIRONMENTS = {ENV_DEV: Environment(region=REGION_LOCAL, prefix=ENV_DEV)}
97-
98-
99-
# TODO: Remove
100-
def get_environment(env=None, region_name=None):
101-
"""
102-
Return an Environment object based on the input arguments.
103-
104-
Parameter `env` can be either of:
105-
* None (or empty), in which case the rules below are applied to (env = os.environ['ENV'] or ENV_DEV)
106-
* an Environment object (then this object is returned)
107-
* a string '<region>:<name>', which corresponds to Environment(region='<region>', prefix='<prefix>')
108-
* the predefined string 'dev' (ENV_DEV), which implies Environment(region='local', prefix='dev')
109-
* a string '<name>', which implies Environment(region=DEFAULT_REGION, prefix='<name>')
110-
111-
Additionally, parameter `region_name` can be used to override DEFAULT_REGION.
112-
"""
113-
if not env:
114-
if "ENV" in os.environ:
115-
env = os.environ["ENV"]
116-
else:
117-
env = ENV_DEV
118-
elif not is_string(env) and not isinstance(env, Environment):
119-
raise Exception("Invalid environment: %s" % env)
120-
121-
if is_string(env):
122-
env = Environment.from_string(env)
123-
if region_name:
124-
env.region = region_name
125-
if not env.region:
126-
raise Exception('Invalid region in environment: "%s"' % env)
127-
return env
128-
129-
130-
def is_local_env(env):
131-
return not env or env.region == REGION_LOCAL or env.prefix == ENV_DEV
132-
133-
13453
# NOTE: This method should not be used as it is not guaranteed to return the correct region
13554
# In the near future it will be deprecated and removed
13655
def get_region():
@@ -211,11 +130,6 @@ def inject_test_credentials_into_env(env):
211130
env["AWS_SECRET_ACCESS_KEY"] = "test"
212131

213132

214-
# TODO: remove
215-
def inject_region_into_env(env, region):
216-
env["AWS_REGION"] = region
217-
218-
219133
def extract_region_from_auth_header(headers: Dict[str, str], use_default=True) -> str:
220134
auth = headers.get("Authorization") or ""
221135
region = re.sub(r".*Credential=[^/]+/[^/]+/([^/]+)/.*", r"\1", auth)

localstack/utils/container_utils/docker_sdk_client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -538,10 +538,12 @@ def wait_for_result(*_):
538538
# start listener thread
539539
start_worker_thread(wait_for_result)
540540
thread_started.wait()
541-
# start container
542-
container.start()
543-
# start awaiting container result
544-
start_waiting.set()
541+
try:
542+
# start container
543+
container.start()
544+
finally:
545+
# start awaiting container result
546+
start_waiting.set()
545547

546548
# handle container input/output
547549
# under windows, the socket has no __enter__ / cannot be used as context manager

localstack/utils/kinesis/kclipy_helper.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,7 @@ def get_kcl_app_command(java, multi_lang_daemon_class, properties, paths=None):
8080
paths = []
8181
logging_config = os.path.join(get_dir_of_file(__file__), "java", "logging.properties")
8282
sys_props = f'-Djava.util.logging.config.file="{logging_config}" -Daws.cborEnabled=false'
83-
return "{java} -cp {cp} {sys_props} {daemon} {props}".format(
84-
java=java,
85-
cp=get_kcl_classpath(properties, paths),
86-
daemon=multi_lang_daemon_class,
87-
# Just need the basename because the path is added to the classpath
88-
props=os.path.basename(properties),
89-
sys_props=sys_props,
90-
)
83+
return f"{java} -cp {get_kcl_classpath(properties, paths)} {sys_props} {multi_lang_daemon_class} {os.path.basename(properties)}"
9184

9285

9386
def create_config_file(

0 commit comments

Comments
 (0)