Skip to content

Commit 56f0d93

Browse files
committed
gh-138122: Make sampling profiler integration tests more resilient
The tests were flaky on slow machines because subprocesses could finish before enough samples were collected. This adds synchronization similar to test_external_inspection: test scripts now signal when they start working, and the profiler waits for this signal before sampling. Test scripts now run in infinite loops until killed rather than for fixed iterations, ensuring the profiler always has active work to sample regardless of machine speed.
1 parent d6d850d commit 56f0d93

File tree

2 files changed

+156
-33
lines changed

2 files changed

+156
-33
lines changed

Lib/test/test_profiling/test_sampling_profiler/helpers.py

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import socket
55
import subprocess
66
import sys
7+
import time
78
import unittest
89
from collections import namedtuple
910

@@ -14,6 +15,9 @@
1415

1516
PROCESS_VM_READV_SUPPORTED = False
1617

18+
# Maximum number of retry attempts for operations that may fail transiently
19+
MAX_TRIES = 10
20+
1721
try:
1822
from _remote_debugging import PROCESS_VM_READV_SUPPORTED # noqa: F401
1923
import _remote_debugging # noqa: F401
@@ -38,12 +42,66 @@
3842
SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])
3943

4044

45+
def _wait_for_signal(sock, expected_signals, timeout=SHORT_TIMEOUT):
46+
"""
47+
Wait for expected signal(s) from a socket with proper timeout and EOF handling.
48+
49+
Args:
50+
sock: Connected socket to read from
51+
expected_signals: Single bytes object or list of bytes objects to wait for
52+
timeout: Socket timeout in seconds
53+
54+
Returns:
55+
bytes: Complete accumulated response buffer
56+
57+
Raises:
58+
RuntimeError: If connection closed before signal received or timeout
59+
"""
60+
if isinstance(expected_signals, bytes):
61+
expected_signals = [expected_signals]
62+
63+
sock.settimeout(timeout)
64+
buffer = b""
65+
66+
while True:
67+
# Check if all expected signals are in buffer
68+
if all(sig in buffer for sig in expected_signals):
69+
return buffer
70+
71+
try:
72+
chunk = sock.recv(4096)
73+
if not chunk:
74+
# EOF - connection closed
75+
raise RuntimeError(
76+
f"Connection closed before receiving expected signals. "
77+
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
78+
)
79+
buffer += chunk
80+
except socket.timeout:
81+
raise RuntimeError(
82+
f"Timeout waiting for signals. "
83+
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
84+
)
85+
86+
87+
def _cleanup_sockets(*sockets):
88+
"""Safely close multiple sockets, ignoring errors."""
89+
for sock in sockets:
90+
if sock is not None:
91+
try:
92+
sock.close()
93+
except OSError:
94+
pass
95+
96+
4197
@contextlib.contextmanager
42-
def test_subprocess(script):
98+
def test_subprocess(script, wait_for_working=True):
4399
"""Context manager to create a test subprocess with socket synchronization.
44100
45101
Args:
46-
script: Python code to execute in the subprocess
102+
script: Python code to execute in the subprocess. Should send b"working"
103+
to signal when work has started.
104+
wait_for_working: If True, wait for both "ready" and "working" signals
47105
48106
Yields:
49107
SubprocessInfo: Named tuple with process and socket objects
@@ -80,19 +138,27 @@ def test_subprocess(script):
80138
# Wait for process to connect and send ready signal
81139
client_socket, _ = server_socket.accept()
82140
server_socket.close()
83-
response = client_socket.recv(1024)
84-
if response != b"ready":
85-
raise RuntimeError(
86-
f"Unexpected response from subprocess: {response!r}"
87-
)
141+
server_socket = None
142+
143+
# Wait for ready signal, and optionally working signal
144+
if wait_for_working:
145+
_wait_for_signal(client_socket, [b"ready", b"working"])
146+
else:
147+
_wait_for_signal(client_socket, b"ready")
88148

89149
yield SubprocessInfo(proc, client_socket)
90150
finally:
91-
if client_socket is not None:
92-
client_socket.close()
151+
_cleanup_sockets(client_socket, server_socket)
93152
if proc.poll() is None:
94-
proc.kill()
95-
proc.wait()
153+
proc.terminate()
154+
try:
155+
proc.wait(timeout=SHORT_TIMEOUT)
156+
except subprocess.TimeoutExpired:
157+
proc.kill()
158+
try:
159+
proc.wait(timeout=SHORT_TIMEOUT)
160+
except subprocess.TimeoutExpired:
161+
pass # Process refuses to die
96162

97163

98164
def close_and_unlink(file):

Lib/test/test_profiling/test_sampling_profiler/test_integration.py

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@
3333
close_and_unlink,
3434
skip_if_not_supported,
3535
PROCESS_VM_READV_SUPPORTED,
36+
MAX_TRIES,
3637
)
3738
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
3839

3940
# Duration for profiling tests - long enough for process to complete naturally
4041
PROFILING_TIMEOUT = str(int(SHORT_TIMEOUT))
4142

43+
# Duration for profiling in tests - short enough to complete quickly
44+
PROFILING_DURATION_SEC = 2
45+
4246

4347
@skip_if_not_supported
4448
@unittest.skipIf(
@@ -364,9 +368,44 @@ def total_occurrences(func):
364368
class TestSampleProfilerIntegration(unittest.TestCase):
365369
@classmethod
366370
def setUpClass(cls):
371+
# Test script that runs in an infinite loop until killed.
372+
# Sends "working" signal after starting work to synchronize with profiler.
373+
# Used with test_subprocess() which injects socket code.
367374
cls.test_script = '''
375+
def slow_fibonacci(n):
376+
"""Recursive fibonacci - should show up prominently in profiler."""
377+
if n <= 1:
378+
return n
379+
return slow_fibonacci(n-1) + slow_fibonacci(n-2)
380+
381+
def cpu_intensive_work():
382+
"""CPU intensive work that should show in profiler."""
383+
result = 0
384+
for i in range(10000):
385+
result += i * i
386+
if i % 100 == 0:
387+
result = result % 1000000
388+
return result
389+
390+
def main_loop():
391+
"""Main test loop - runs indefinitely until killed."""
392+
# Signal that we're starting work
393+
_test_sock.sendall(b"working")
394+
395+
iteration = 0
396+
while True:
397+
if iteration % 2 == 0:
398+
result = slow_fibonacci(15)
399+
else:
400+
result = cpu_intensive_work()
401+
iteration += 1
402+
403+
if __name__ == "__main__":
404+
main_loop()
405+
'''
406+
# CLI test script - runs for a fixed duration (used for CLI tests without socket sync)
407+
cls.cli_test_script = '''
368408
import time
369-
import os
370409
371410
def slow_fibonacci(n):
372411
"""Recursive fibonacci - should show up prominently in profiler."""
@@ -384,14 +423,15 @@ def cpu_intensive_work():
384423
return result
385424
386425
def main_loop():
387-
"""Main test loop."""
388-
max_iterations = 200
389-
390-
for iteration in range(max_iterations):
426+
"""Main test loop - runs for a fixed duration."""
427+
end_time = time.time() + 30 # Run for 30 seconds max
428+
iteration = 0
429+
while time.time() < end_time:
391430
if iteration % 2 == 0:
392431
result = slow_fibonacci(15)
393432
else:
394433
result = cpu_intensive_work()
434+
iteration += 1
395435
396436
if __name__ == "__main__":
397437
main_loop()
@@ -404,12 +444,11 @@ def test_sampling_basic_functionality(self):
404444
mock.patch("sys.stdout", captured_output),
405445
):
406446
try:
407-
# Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
408447
collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
409448
profiling.sampling.sample.sample(
410449
subproc.process.pid,
411450
collector,
412-
duration_sec=SHORT_TIMEOUT,
451+
duration_sec=PROFILING_DURATION_SEC,
413452
)
414453
collector.print_stats(show_summary=False)
415454
except PermissionError:
@@ -442,7 +481,7 @@ def test_sampling_with_pstats_export(self):
442481
profiling.sampling.sample.sample(
443482
subproc.process.pid,
444483
collector,
445-
duration_sec=1,
484+
duration_sec=PROFILING_DURATION_SEC,
446485
)
447486
collector.export(pstats_out.name)
448487
except PermissionError:
@@ -488,7 +527,7 @@ def test_sampling_with_collapsed_export(self):
488527
profiling.sampling.sample.sample(
489528
subproc.process.pid,
490529
collector,
491-
duration_sec=1,
530+
duration_sec=PROFILING_DURATION_SEC,
492531
)
493532
collector.export(collapsed_file.name)
494533
except PermissionError:
@@ -536,7 +575,7 @@ def test_sampling_all_threads(self):
536575
profiling.sampling.sample.sample(
537576
subproc.process.pid,
538577
collector,
539-
duration_sec=1,
578+
duration_sec=PROFILING_DURATION_SEC,
540579
all_threads=True,
541580
)
542581
collector.print_stats(show_summary=False)
@@ -548,12 +587,16 @@ def test_sampling_all_threads(self):
548587

549588
def test_sample_target_script(self):
550589
script_file = tempfile.NamedTemporaryFile(delete=False)
551-
script_file.write(self.test_script.encode("utf-8"))
590+
script_file.write(self.cli_test_script.encode("utf-8"))
552591
script_file.flush()
553592
self.addCleanup(close_and_unlink, script_file)
554593

555-
# Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
556-
test_args = ["profiling.sampling.sample", "run", "-d", PROFILING_TIMEOUT, script_file.name]
594+
# Sample for PROFILING_DURATION_SEC seconds
595+
test_args = [
596+
"profiling.sampling.sample", "run",
597+
"-d", str(PROFILING_DURATION_SEC),
598+
script_file.name
599+
]
557600

558601
with (
559602
mock.patch("sys.argv", test_args),
@@ -583,13 +626,13 @@ def test_sample_target_module(self):
583626
module_path = os.path.join(tempdir.name, "test_module.py")
584627

585628
with open(module_path, "w") as f:
586-
f.write(self.test_script)
629+
f.write(self.cli_test_script)
587630

588631
test_args = [
589632
"profiling.sampling.cli",
590633
"run",
591634
"-d",
592-
PROFILING_TIMEOUT,
635+
str(PROFILING_DURATION_SEC),
593636
"-m",
594637
"test_module",
595638
]
@@ -630,8 +673,10 @@ def test_invalid_pid(self):
630673
profiling.sampling.sample.sample(-1, collector, duration_sec=1)
631674

632675
def test_process_dies_during_sampling(self):
676+
# Use wait_for_working=False since this simple script doesn't send "working"
633677
with test_subprocess(
634-
"import time; time.sleep(0.5); exit()"
678+
"import time; time.sleep(0.5); exit()",
679+
wait_for_working=False
635680
) as subproc:
636681
with (
637682
io.StringIO() as captured_output,
@@ -654,7 +699,11 @@ def test_process_dies_during_sampling(self):
654699
self.assertIn("Error rate", output)
655700

656701
def test_is_process_running(self):
657-
with test_subprocess("import time; time.sleep(1000)") as subproc:
702+
# Use wait_for_working=False since this simple script doesn't send "working"
703+
with test_subprocess(
704+
"import time; time.sleep(1000)",
705+
wait_for_working=False
706+
) as subproc:
658707
try:
659708
profiler = SampleProfiler(
660709
pid=subproc.process.pid,
@@ -681,7 +730,11 @@ def test_is_process_running(self):
681730

682731
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
683732
def test_esrch_signal_handling(self):
684-
with test_subprocess("import time; time.sleep(1000)") as subproc:
733+
# Use wait_for_working=False since this simple script doesn't send "working"
734+
with test_subprocess(
735+
"import time; time.sleep(1000)",
736+
wait_for_working=False
737+
) as subproc:
685738
try:
686739
unwinder = _remote_debugging.RemoteUnwinder(
687740
subproc.process.pid
@@ -793,25 +846,29 @@ class TestAsyncAwareProfilingIntegration(unittest.TestCase):
793846

794847
@classmethod
795848
def setUpClass(cls):
849+
# Async test script that runs indefinitely until killed.
850+
# Sends "working" signal after starting work to synchronize with profiler.
796851
cls.async_script = '''
797852
import asyncio
798853
799854
async def sleeping_leaf():
800855
"""Leaf task that just sleeps - visible in 'all' mode."""
801-
for _ in range(50):
856+
while True:
802857
await asyncio.sleep(0.02)
803858
804859
async def cpu_leaf():
805860
"""Leaf task that does CPU work - visible in both modes."""
806861
total = 0
807-
for _ in range(200):
862+
while True:
808863
for i in range(10000):
809864
total += i * i
810865
await asyncio.sleep(0)
811-
return total
812866
813867
async def supervisor():
814868
"""Middle layer that spawns leaf tasks."""
869+
# Signal that we're starting work
870+
_test_sock.sendall(b"working")
871+
815872
tasks = [
816873
asyncio.create_task(sleeping_leaf(), name="Sleeper-0"),
817874
asyncio.create_task(sleeping_leaf(), name="Sleeper-1"),
@@ -838,7 +895,7 @@ def _collect_async_samples(self, async_aware_mode):
838895
profiling.sampling.sample.sample(
839896
subproc.process.pid,
840897
collector,
841-
duration_sec=SHORT_TIMEOUT,
898+
duration_sec=PROFILING_DURATION_SEC,
842899
async_aware=async_aware_mode,
843900
)
844901
except PermissionError:

0 commit comments

Comments
 (0)