Skip to content

Commit f130b06

Browse files
authored
[3.14] gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol (GH-142215) (GH-142285)
(cherry picked from commit 4172644) Difference from the original commit: the default in 3.14 is to use the simpler original protocol (except for filenames with newlines). Co-authored-by: Petr Viktorin <[email protected]>
1 parent 654e3c7 commit f130b06

File tree

3 files changed

+76
-21
lines changed

3 files changed

+76
-21
lines changed

Lib/multiprocessing/resource_tracker.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ def __init__(self):
6868
self._exitcode = None
6969
self._reentrant_messages = deque()
7070

71+
# True to use colon-separated lines, rather than JSON lines,
72+
# for internal communication. (Mainly for testing).
73+
# Filenames not supported by the simple format will always be sent
74+
# using JSON.
75+
# The reader should understand all formats.
76+
self._use_simple_format = True
77+
7178
def _reentrant_call_error(self):
7279
# gh-109629: this happens if an explicit call to the ResourceTracker
7380
# gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -200,7 +207,9 @@ def _launch(self):
200207
os.close(r)
201208

202209
def _make_probe_message(self):
203-
"""Return a JSON-encoded probe message."""
210+
"""Return a probe message."""
211+
if self._use_simple_format:
212+
return b'PROBE:0:noop\n'
204213
return (
205214
json.dumps(
206215
{"cmd": "PROBE", "rtype": "noop"},
@@ -267,6 +276,15 @@ def _write(self, msg):
267276
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
268277

269278
def _send(self, cmd, name, rtype):
279+
if self._use_simple_format and '\n' not in name:
280+
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
281+
if len(msg) > 512:
282+
# posix guarantees that writes to a pipe of less than PIPE_BUF
283+
# bytes are atomic, and that PIPE_BUF >= 512
284+
raise ValueError('msg too long')
285+
self._ensure_running_and_write(msg)
286+
return
287+
270288
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
271289
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
272290
# POSIX shm_open() and sem_open() require the name, including its leading slash,
@@ -286,6 +304,7 @@ def _send(self, cmd, name, rtype):
286304

287305
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
288306
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
307+
assert msg.startswith(b'{')
289308

290309
self._ensure_running_and_write(msg)
291310

@@ -296,6 +315,30 @@ def _send(self, cmd, name, rtype):
296315
getfd = _resource_tracker.getfd
297316

298317

318+
def _decode_message(line):
319+
if line.startswith(b'{'):
320+
try:
321+
obj = json.loads(line.decode('ascii'))
322+
except Exception as e:
323+
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
324+
325+
cmd = obj["cmd"]
326+
rtype = obj["rtype"]
327+
b64 = obj.get("base64_name", "")
328+
329+
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
330+
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
331+
332+
try:
333+
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
334+
except ValueError as e:
335+
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
336+
else:
337+
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
338+
name, rtype = rest.rsplit(':', maxsplit=1)
339+
return cmd, rtype, name
340+
341+
299342
def main(fd):
300343
'''Run resource tracker.'''
301344
# protect the process from ^C and "killall python" etc
@@ -318,23 +361,7 @@ def main(fd):
318361
with open(fd, 'rb') as f:
319362
for line in f:
320363
try:
321-
try:
322-
obj = json.loads(line.decode('ascii'))
323-
except Exception as e:
324-
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
325-
326-
cmd = obj["cmd"]
327-
rtype = obj["rtype"]
328-
b64 = obj.get("base64_name", "")
329-
330-
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
331-
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
332-
333-
try:
334-
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
335-
except ValueError as e:
336-
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
337-
364+
cmd, rtype, name = _decode_message(line)
338365
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
339366
if cleanup_func is None:
340367
raise ValueError(

Lib/test/_test_multiprocessing.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from test.support import socket_helper
4040
from test.support import threading_helper
4141
from test.support import warnings_helper
42-
42+
from test.support import subTests
4343

4444
# Skip tests if _multiprocessing wasn't built.
4545
_multiprocessing = import_helper.import_module('_multiprocessing')
@@ -4284,6 +4284,19 @@ def test_copy(self):
42844284
self.assertEqual(bar.z, 2 ** 33)
42854285

42864286

4287+
def resource_tracker_format_subtests(func):
4288+
"""Run given test using both resource tracker communication formats"""
4289+
def _inner(self, *args, **kwargs):
4290+
tracker = resource_tracker._resource_tracker
4291+
for use_simple_format in False, True:
4292+
with (
4293+
self.subTest(use_simple_format=use_simple_format),
4294+
unittest.mock.patch.object(
4295+
tracker, '_use_simple_format', use_simple_format)
4296+
):
4297+
func(self, *args, **kwargs)
4298+
return _inner
4299+
42874300
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
42884301
@hashlib_helper.requires_hashdigest('sha256')
42894302
class _TestSharedMemory(BaseTestCase):
@@ -4561,6 +4574,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
45614574
smm.shutdown()
45624575

45634576
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4577+
@resource_tracker_format_subtests
45644578
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
45654579
# bpo-36867: test that a SharedMemoryManager uses the
45664580
# same resource_tracker process as its parent.
@@ -4811,6 +4825,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
48114825
"shared_memory objects to clean up at shutdown", err)
48124826

48134827
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4828+
@resource_tracker_format_subtests
48144829
def test_shared_memory_untracking(self):
48154830
# gh-82300: When a separate Python process accesses shared memory
48164831
# with track=False, it must not cause the memory to be deleted
@@ -4838,6 +4853,7 @@ def test_shared_memory_untracking(self):
48384853
mem.close()
48394854

48404855
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4856+
@resource_tracker_format_subtests
48414857
def test_shared_memory_tracking(self):
48424858
# gh-82300: When a separate Python process accesses shared memory
48434859
# with track=True, it must cause the memory to be deleted when
@@ -7149,13 +7165,18 @@ class SemLock(_multiprocessing.SemLock):
71497165

71507166
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
71517167
class TestSharedMemoryNames(unittest.TestCase):
7152-
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
7168+
@subTests('use_simple_format', (True, False))
7169+
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
7170+
self, use_simple_format):
71537171
# Test script that creates and cleans up shared memory with colon in name
71547172
test_script = textwrap.dedent("""
71557173
import sys
71567174
from multiprocessing import shared_memory
7175+
from multiprocessing import resource_tracker
71577176
import time
71587177
7178+
resource_tracker._resource_tracker._use_simple_format = %s
7179+
71597180
# Test various patterns of colons in names
71607181
test_names = [
71617182
"a:b",
@@ -7183,7 +7204,7 @@ def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self
71837204
sys.exit(1)
71847205
71857206
print("SUCCESS")
7186-
""")
7207+
""" % use_simple_format)
71877208

71887209
rc, out, err = script_helper.assert_python_ok("-c", test_script)
71897210
self.assertIn(b"SUCCESS", out)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The resource tracker in the :mod:`multiprocessing` module now uses the
2+
original communication protocol, as in Python 3.14.0 and below,
3+
by default.
4+
This avoids issues with upgrading Python while it is running.
5+
(Note that such 'in-place' upgrades are not tested.)
6+
The tracker remains compatible with subprocesses that use new protocol
7+
(that is, subprocesses using Python 3.13.10, 3.14.1 and 3.15).

0 commit comments

Comments
 (0)