Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit b66207d

Browse files
committed
Admin: Add typehints to utils/strings and utils/threads
1 parent 44d1022 commit b66207d

File tree

6 files changed

+127
-68
lines changed

6 files changed

+127
-68
lines changed

localstack-core/localstack/utils/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def is_command_available(cmd: str) -> bool:
170170
return False
171171

172172

173-
def kill_process_tree(parent_pid):
173+
def kill_process_tree(parent_pid: int) -> None:
174174
# Note: Do NOT import "psutil" at the root scope
175175
import psutil
176176

localstack-core/localstack/utils/strings.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77
import string
88
import uuid
99
import zlib
10+
from typing import TYPE_CHECKING, Any
1011

1112
from localstack.config import DEFAULT_ENCODING
1213

14+
if TYPE_CHECKING:
15+
from localstack.utils.objects import ComplexType
16+
1317
_unprintables = (
1418
range(0x00, 0x09),
1519
range(0x0A, 0x0A),
@@ -27,13 +31,13 @@
2731
)
2832

2933

30-
def to_str(obj: str | bytes, encoding: str = DEFAULT_ENCODING, errors="strict") -> str:
34+
def to_str(obj: str | bytes, encoding: str = DEFAULT_ENCODING, errors: str = "strict") -> str:
3135
"""If ``obj`` is an instance of ``binary_type``, return
3236
``obj.decode(encoding, errors)``, otherwise return ``obj``"""
3337
return obj.decode(encoding, errors) if isinstance(obj, bytes) else obj
3438

3539

36-
def to_bytes(obj: str | bytes, encoding: str = DEFAULT_ENCODING, errors="strict") -> bytes:
40+
def to_bytes(obj: str | bytes, encoding: str = DEFAULT_ENCODING, errors: str = "strict") -> bytes:
3741
"""If ``obj`` is an instance of ``text_type``, return
3842
``obj.encode(encoding, errors)``, otherwise return ``obj``"""
3943
return obj.encode(encoding, errors) if isinstance(obj, str) else obj
@@ -44,7 +48,7 @@ def truncate(data: str, max_length: int = 100) -> str:
4448
return (f"{data[:max_length]}...") if len(data) > max_length else data
4549

4650

47-
def is_string(s, include_unicode=True, exclude_binary=False):
51+
def is_string(s: Any, include_unicode: bool = True, exclude_binary: bool = False) -> bool:
4852
if isinstance(s, bytes) and exclude_binary:
4953
return False
5054
if isinstance(s, str):
@@ -54,13 +58,13 @@ def is_string(s, include_unicode=True, exclude_binary=False):
5458
return False
5559

5660

57-
def is_string_or_bytes(s):
61+
def is_string_or_bytes(s: Any) -> bool:
5862
return is_string(s) or isinstance(s, str) or isinstance(s, bytes)
5963

6064

61-
def is_base64(s):
65+
def is_base64(s: Any) -> bool:
6266
regex = r"^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$"
63-
return is_string(s) and re.match(regex, s)
67+
return is_string(s) and re.match(regex, s) is not None
6468

6569

6670
_re_camel_to_snake_case = re.compile("((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))")
@@ -85,22 +89,21 @@ def canonicalize_bool_to_str(val: bool) -> str:
8589
return "true" if str(val).lower() == "true" else "false"
8690

8791

88-
def convert_to_printable_chars(value: list | dict | str) -> str:
92+
def convert_to_printable_chars(value: "str | ComplexType") -> "ComplexType":
8993
"""Removes all unprintable characters from the given string."""
9094
from localstack.utils.objects import recurse_object
9195

92-
if isinstance(value, (dict, list)):
96+
if isinstance(value, str):
97+
return REGEX_UNPRINTABLE_CHARS.sub("", value)
98+
else:
9399

94-
def _convert(obj, **kwargs):
100+
def _convert(obj: Any, **kwargs: Any) -> "ComplexType":
95101
if isinstance(obj, str):
96102
return convert_to_printable_chars(obj)
97103
return obj
98104

99105
return recurse_object(value, _convert)
100106

101-
result = REGEX_UNPRINTABLE_CHARS.sub("", value)
102-
return result
103-
104107

105108
def first_char_to_lower(s: str) -> str:
106109
return s and f"{s[0].lower()}{s[1:]}"
@@ -110,20 +113,20 @@ def first_char_to_upper(s: str) -> str:
110113
return s and f"{s[0].upper()}{s[1:]}"
111114

112115

113-
def str_to_bool(value):
116+
def str_to_bool(value: Any) -> Any:
114117
"""Return the boolean value of the given string, or the verbatim value if it is not a string"""
115118
if isinstance(value, str):
116119
true_strings = ["true", "True"]
117120
return value in true_strings
118121
return value
119122

120123

121-
def str_insert(string, index, content):
124+
def str_insert(string: str, index: int, content: str) -> str:
122125
"""Insert a substring into an existing string at a certain index."""
123126
return f"{string[:index]}{content}{string[index:]}"
124127

125128

126-
def str_remove(string, index, end_index=None):
129+
def str_remove(string: str, index: int, end_index: int | None = None) -> str:
127130
"""Remove a substring from an existing string at a certain from-to index range."""
128131
end_index = end_index or (index + 1)
129132
return f"{string[:index]}{string[end_index:]}"
@@ -159,7 +162,7 @@ def checksum_crc32(string: str | bytes) -> str:
159162
return base64.b64encode(checksum.to_bytes(4, "big")).decode()
160163

161164

162-
def checksum_crc32c(string: str | bytes):
165+
def checksum_crc32c(string: str | bytes) -> str:
163166
# import botocore locally here to avoid a dependency of the CLI to botocore
164167
from botocore.httpchecksum import CrtCrc32cChecksum
165168

@@ -168,7 +171,7 @@ def checksum_crc32c(string: str | bytes):
168171
return base64.b64encode(checksum.digest()).decode()
169172

170173

171-
def checksum_crc64nvme(string: str | bytes):
174+
def checksum_crc64nvme(string: str | bytes) -> str:
172175
# import botocore locally here to avoid a dependency of the CLI to botocore
173176
from botocore.httpchecksum import CrtCrc64NvmeChecksum
174177

@@ -223,7 +226,9 @@ def prepend_with_slash(input: str) -> str:
223226
return input
224227

225228

226-
def key_value_pairs_to_dict(pairs: str, delimiter: str = ",", separator: str = "=") -> dict:
229+
def key_value_pairs_to_dict(
230+
pairs: str, delimiter: str = ",", separator: str = "="
231+
) -> dict[str, str]:
227232
"""
228233
Converts a string of key-value pairs to a dictionary.
229234

localstack-core/localstack/utils/threads.py

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
import concurrent.futures
2-
import inspect
32
import logging
3+
import subprocess
44
import threading
55
import traceback
66
from collections.abc import Callable
77
from concurrent.futures import Future
88
from multiprocessing.dummy import Pool
9+
from typing import TYPE_CHECKING, Any, TypeVar
10+
11+
if TYPE_CHECKING:
12+
from typing_extensions import ParamSpec
13+
14+
P = ParamSpec("P")
15+
16+
T = TypeVar("T")
917

1018
LOG = logging.getLogger(__name__)
1119

1220
# arrays for temporary threads and resources
13-
TMP_THREADS = []
14-
TMP_PROCESSES = []
21+
TMP_THREADS: list["FuncThread"] = []
22+
TMP_PROCESSES: list[subprocess.Popen[Any]] = []
1523

1624
counter_lock = threading.Lock()
1725
counter = 0
@@ -22,12 +30,12 @@ class FuncThread(threading.Thread):
2230

2331
def __init__(
2432
self,
25-
func,
26-
params=None,
27-
quiet=False,
28-
on_stop: Callable[["FuncThread"], None] = None,
33+
func: "Callable[P, T]",
34+
params: Any = None,
35+
quiet: bool = False,
36+
on_stop: Callable[["FuncThread"], None] | None = None,
2937
name: str | None = None,
30-
daemon=True,
38+
daemon: bool = True,
3139
):
3240
global counter
3341
global counter_lock
@@ -46,17 +54,14 @@ def __init__(
4654
self.params = params
4755
self.func = func
4856
self.quiet = quiet
49-
self.result_future = Future()
57+
self.result_future: Future[T | Exception | None] = Future()
5058
self._stop_event = threading.Event()
5159
self.on_stop = on_stop
5260

53-
def run(self):
54-
result = None
61+
def run(self) -> None:
62+
result: Any = None
5563
try:
56-
kwargs = {}
57-
argspec = inspect.getfullargspec(self.func)
58-
if argspec.varkw or "_thread" in (argspec.args or []) + (argspec.kwonlyargs or []):
59-
kwargs["_thread"] = self
64+
kwargs = {} # type: ignore[var-annotated]
6065
result = self.func(self.params, **kwargs)
6166
except Exception as e:
6267
self.result_future.set_exception(e)
@@ -78,7 +83,7 @@ def run(self):
7883
LOG.debug(e)
7984

8085
@property
81-
def running(self):
86+
def running(self) -> bool:
8287
return not self._stop_event.is_set()
8388

8489
def stop(self, quiet: bool = False) -> None:
@@ -91,27 +96,33 @@ def stop(self, quiet: bool = False) -> None:
9196
LOG.warning("error while calling on_stop callback: %s", e)
9297

9398

94-
def start_thread(method, *args, **kwargs) -> FuncThread: # TODO: find all usages and add names...
99+
def start_thread(
100+
method: "Callable[P, T]",
101+
params: Any = None,
102+
quiet: bool = False,
103+
on_stop: Callable[["FuncThread"], None] | None = None,
104+
_shutdown_hook: bool = True,
105+
name: str | None = None,
106+
) -> FuncThread:
95107
"""Start the given method in a background thread, and add the thread to the TMP_THREADS shutdown hook"""
96-
_shutdown_hook = kwargs.pop("_shutdown_hook", True)
97-
if not kwargs.get("name"):
98-
LOG.debug(
99-
"start_thread called without providing a custom name"
100-
) # technically we should add a new level here for *internal* warnings
101-
kwargs.setdefault("name", method.__name__)
102-
thread = FuncThread(method, *args, **kwargs)
108+
if not name:
109+
# technically we should add a new level here for *internal* warnings
110+
LOG.debug("start_thread called without providing a custom name")
111+
name = name or method.__name__
112+
thread = FuncThread(method, params=params, quiet=quiet, name=name, on_stop=on_stop)
103113
thread.start()
104114
if _shutdown_hook:
105115
TMP_THREADS.append(thread)
106116
return thread
107117

108118

109-
def start_worker_thread(method, *args, **kwargs):
110-
kwargs.setdefault("name", "start_worker_thread")
111-
return start_thread(method, *args, _shutdown_hook=False, **kwargs)
119+
def start_worker_thread(
120+
method: "Callable[P, T]", params: Any = None, name: str | None = None
121+
) -> FuncThread:
122+
return start_thread(method, params, _shutdown_hook=False, name=name or "start_worker_thread")
112123

113124

114-
def cleanup_threads_and_processes(quiet=True):
125+
def cleanup_threads_and_processes(quiet: bool = True) -> None:
115126
from localstack.utils.run import kill_process_tree
116127

117128
for thread in TMP_THREADS:
@@ -153,7 +164,7 @@ def cleanup_threads_and_processes(quiet=True):
153164
TMP_PROCESSES.clear()
154165

155166

156-
def parallelize(func: Callable, arr: list, size: int = None):
167+
def parallelize(func: Callable, arr: list, size: int = None): # type: ignore
157168
if not size:
158169
size = len(arr)
159170
if size <= 0:

localstack-core/mypy.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[mypy]
22
explicit_package_bases = true
33
mypy_path=localstack-core
4-
files=localstack/aws/api/core.py,localstack/packages,localstack/services/transcribe,localstack/services/kinesis/packages.py
4+
files=localstack/aws/api/core.py,localstack/utils/threads.py,localstack/utils/strings.py,localstack/packages,localstack/services/transcribe,localstack/services/kinesis/packages.py
55
ignore_missing_imports = False
66
follow_imports = silent
77
ignore_errors = False

tests/unit/test_common.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import io
33
import itertools
44
import os
5-
import threading
65
import time
76
import zipfile
87
from datetime import UTC, date, datetime, timedelta
@@ -363,24 +362,6 @@ def test_format_number(self):
363362
assert fn(-12.521, decimals=4) == "-12.521"
364363
assert fn(-1.2234354123e3, decimals=4) == "-1223.4354"
365364

366-
def test_cleanup_threads_and_processes_calls_shutdown_hooks(self):
367-
# TODO: move all run/concurrency related tests into separate class
368-
369-
started = threading.Event()
370-
done = threading.Event()
371-
372-
def run_method(*args, **kwargs):
373-
started.set()
374-
func_thread = kwargs["_thread"]
375-
# thread waits until it is stopped
376-
func_thread._stop_event.wait()
377-
done.set()
378-
379-
common.start_thread(run_method)
380-
assert started.wait(timeout=2)
381-
common.cleanup_threads_and_processes()
382-
assert done.wait(timeout=2)
383-
384365
def test_proxy_map(self):
385366
old_http_proxy = config.OUTBOUND_HTTP_PROXY
386367
old_https_proxy = config.OUTBOUND_HTTPS_PROXY

tests/unit/utils/test_threads.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import threading
2+
3+
from localstack.utils.threads import (
4+
TMP_THREADS,
5+
FuncThread,
6+
cleanup_threads_and_processes,
7+
start_thread,
8+
start_worker_thread,
9+
)
10+
11+
12+
class TestThreads:
13+
class TestStartThread:
14+
def test_start_thread_returns_a_func_thread(self):
15+
def examplefunc(*args):
16+
pass
17+
18+
thread = start_thread(examplefunc)
19+
20+
assert isinstance(thread, FuncThread)
21+
assert thread.name.startswith("examplefunc-")
22+
assert thread in TMP_THREADS
23+
24+
def test_start_thread_with_custom_name(self):
25+
thread = start_thread(lambda: None, name="somefunc")
26+
27+
assert thread.name.startswith("somefunc-")
28+
29+
class TestStartWorkerThread:
30+
def test_start_worker_thread_returns_a_func_thread(self):
31+
thread = start_worker_thread(lambda: None)
32+
33+
assert isinstance(thread, FuncThread)
34+
assert thread.name.startswith("start_worker_thread-")
35+
assert thread not in TMP_THREADS
36+
37+
def test_start_worker_thread_with_custom_name(self):
38+
thread = start_worker_thread(lambda: None, name="somefunc")
39+
assert thread.name.startswith("somefunc-")
40+
41+
def test_cleanup_threads_and_processes_calls_shutdown_hooks(self):
42+
started = threading.Event()
43+
done = threading.Event()
44+
45+
# Note: we're extending FuncThread here to make sure we have access to `_stop_event`
46+
# Regular users would use `start_thread` instead
47+
class ThreadTest(FuncThread):
48+
def __init__(self) -> None:
49+
super().__init__(self.run_method)
50+
51+
def run_method(self, *args):
52+
started.set()
53+
# thread waits until it is stopped
54+
self._stop_event.wait()
55+
done.set()
56+
57+
test_thread = ThreadTest()
58+
TMP_THREADS.append(test_thread)
59+
test_thread.start()
60+
assert started.wait(timeout=2)
61+
cleanup_threads_and_processes()
62+
assert done.wait(timeout=2)

0 commit comments

Comments
 (0)