Skip to content

Commit de3195c

Browse files
authored
[3.8] bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) (GH-16560)
* bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) WindowsLoadTracker.read_output() now uses a short buffer for incomplete line. (cherry picked from commit 3e04cd2) * bpo-36670: Enhance regrtest WindowsLoadTracker (GH-16553) The last line is now passed to the parser even if it does not end with a newline, but only if it's a valid value. (cherry picked from commit c65119d) * bpo-36670: Enhance regrtest (GH-16556) * Add log() method: add timestamp and load average prefixes to main messages. * WindowsLoadTracker: * LOAD_FACTOR_1 is now computed using SAMPLING_INTERVAL * Initialize the load to the arithmetic mean of the first 5 values of the Processor Queue Length value (so over 5 seconds), rather than 0.0. * Handle BrokenPipeError and when typeperf exit. * format_duration(1.5) now returns '1.5 sec', rather than '1 sec 500 ms' (cherry picked from commit 098e256)
1 parent 183733d commit de3195c

File tree

5 files changed

+134
-74
lines changed

5 files changed

+134
-74
lines changed

Lib/test/libregrtest/main.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,8 @@ def accumulate_result(self, result, rerun=False):
139139
print(xml_data, file=sys.__stderr__)
140140
raise
141141

142-
def display_progress(self, test_index, text):
143-
if self.ns.quiet:
144-
return
145-
146-
# "[ 51/405/1] test_tcl passed"
147-
line = f"{test_index:{self.test_count_width}}{self.test_count}"
148-
fails = len(self.bad) + len(self.environment_changed)
149-
if fails and not self.ns.pgo:
150-
line = f"{line}/{fails}"
151-
line = f"[{line}] {text}"
142+
def log(self, line=''):
143+
empty = not line
152144

153145
# add the system load prefix: "load avg: 1.80 "
154146
load_avg = self.getloadavg()
@@ -159,8 +151,23 @@ def display_progress(self, test_index, text):
159151
test_time = time.monotonic() - self.start_time
160152
test_time = datetime.timedelta(seconds=int(test_time))
161153
line = f"{test_time} {line}"
154+
155+
if empty:
156+
line = line[:-1]
157+
162158
print(line, flush=True)
163159

160+
def display_progress(self, test_index, text):
161+
if self.ns.quiet:
162+
return
163+
164+
# "[ 51/405/1] test_tcl passed"
165+
line = f"{test_index:{self.test_count_width}}{self.test_count}"
166+
fails = len(self.bad) + len(self.environment_changed)
167+
if fails and not self.ns.pgo:
168+
line = f"{line}/{fails}"
169+
self.log(f"[{line}] {text}")
170+
164171
def parse_args(self, kwargs):
165172
ns = _parse_args(sys.argv[1:], **kwargs)
166173

@@ -302,11 +309,11 @@ def rerun_failed_tests(self):
302309

303310
self.first_result = self.get_tests_result()
304311

305-
print()
306-
print("Re-running failed tests in verbose mode")
312+
self.log()
313+
self.log("Re-running failed tests in verbose mode")
307314
self.rerun = self.bad[:]
308315
for test_name in self.rerun:
309-
print(f"Re-running {test_name} in verbose mode", flush=True)
316+
self.log(f"Re-running {test_name} in verbose mode")
310317
self.ns.verbose = True
311318
result = runtest(self.ns, test_name)
312319

@@ -387,7 +394,7 @@ def run_tests_sequential(self):
387394

388395
save_modules = sys.modules.keys()
389396

390-
print("Run tests sequentially")
397+
self.log("Run tests sequentially")
391398

392399
previous_test = None
393400
for test_index, test_name in enumerate(self.tests, 1):

Lib/test/libregrtest/runtest_mp.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,14 @@ class ExitThread(Exception):
104104

105105

106106
class TestWorkerProcess(threading.Thread):
107-
def __init__(self, worker_id, pending, output, ns, timeout):
107+
def __init__(self, worker_id, runner):
108108
super().__init__()
109109
self.worker_id = worker_id
110-
self.pending = pending
111-
self.output = output
112-
self.ns = ns
113-
self.timeout = timeout
110+
self.pending = runner.pending
111+
self.output = runner.output
112+
self.ns = runner.ns
113+
self.timeout = runner.worker_timeout
114+
self.regrtest = runner.regrtest
114115
self.current_test_name = None
115116
self.start_time = None
116117
self._popen = None
@@ -294,7 +295,8 @@ def wait_stopped(self, start_time):
294295
if not self.is_alive():
295296
break
296297
dt = time.monotonic() - start_time
297-
print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
298+
self.regrtest.log(f"Waiting for {self} thread "
299+
f"for {format_duration(dt)}")
298300
if dt > JOIN_TIMEOUT:
299301
print_warning(f"Failed to join {self} in {format_duration(dt)}")
300302
break
@@ -316,6 +318,7 @@ def get_running(workers):
316318
class MultiprocessTestRunner:
317319
def __init__(self, regrtest):
318320
self.regrtest = regrtest
321+
self.log = self.regrtest.log
319322
self.ns = regrtest.ns
320323
self.output = queue.Queue()
321324
self.pending = MultiprocessIterator(self.regrtest.tests)
@@ -326,11 +329,10 @@ def __init__(self, regrtest):
326329
self.workers = None
327330

328331
def start_workers(self):
329-
self.workers = [TestWorkerProcess(index, self.pending, self.output,
330-
self.ns, self.worker_timeout)
332+
self.workers = [TestWorkerProcess(index, self)
331333
for index in range(1, self.ns.use_mp + 1)]
332-
print("Run tests in parallel using %s child processes"
333-
% len(self.workers))
334+
self.log("Run tests in parallel using %s child processes"
335+
% len(self.workers))
334336
for worker in self.workers:
335337
worker.start()
336338

@@ -364,7 +366,7 @@ def _get_result(self):
364366
# display progress
365367
running = get_running(self.workers)
366368
if running and not self.ns.pgo:
367-
print('running: %s' % ', '.join(running), flush=True)
369+
self.log('running: %s' % ', '.join(running))
368370

369371
def display_result(self, mp_result):
370372
result = mp_result.result
@@ -384,8 +386,7 @@ def _process_result(self, item):
384386
if item[0]:
385387
# Thread got an exception
386388
format_exc = item[1]
387-
print(f"regrtest worker thread failed: {format_exc}",
388-
file=sys.stderr, flush=True)
389+
print_warning(f"regrtest worker thread failed: {format_exc}")
389390
return True
390391

391392
self.test_index += 1

Lib/test/libregrtest/utils.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ def format_duration(seconds):
1616
if minutes:
1717
parts.append('%s min' % minutes)
1818
if seconds:
19-
parts.append('%s sec' % seconds)
20-
if ms:
21-
parts.append('%s ms' % ms)
19+
if parts:
20+
# 2 min 1 sec
21+
parts.append('%s sec' % seconds)
22+
else:
23+
# 1.0 sec
24+
parts.append('%.1f sec' % (seconds + ms / 1000))
2225
if not parts:
23-
return '0 ms'
26+
return '%s ms' % ms
2427

2528
parts = parts[:2]
2629
return ' '.join(parts)

Lib/test/libregrtest/win_utils.py

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import _winapi
2+
import math
23
import msvcrt
34
import os
45
import subprocess
@@ -10,11 +11,14 @@
1011

1112
# Max size of asynchronous reads
1213
BUFSIZE = 8192
13-
# Exponential damping factor (see below)
14-
LOAD_FACTOR_1 = 0.9200444146293232478931553241
15-
1614
# Seconds per measurement
17-
SAMPLING_INTERVAL = 5
15+
SAMPLING_INTERVAL = 1
16+
# Exponential damping factor to compute exponentially weighted moving average
17+
# on 1 minute (60 seconds)
18+
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
19+
# Initialize the load using the arithmetic mean of the first NVALUE values
20+
# of the Processor Queue Length
21+
NVALUE = 5
1822
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
1923
# of typeperf are registered
2024
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
@@ -30,9 +34,10 @@ class WindowsLoadTracker():
3034
"""
3135

3236
def __init__(self):
33-
self.load = 0.0
34-
self.counter_name = ''
35-
self.popen = None
37+
self._values = []
38+
self._load = None
39+
self._buffer = ''
40+
self._popen = None
3641
self.start()
3742

3843
def start(self):
@@ -64,7 +69,7 @@ def start(self):
6469
# Spawn off the load monitor
6570
counter_name = self._get_counter_name()
6671
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
67-
self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
72+
self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
6873

6974
# Close our copy of the write end of the pipe
7075
os.close(command_stdout)
@@ -84,60 +89,102 @@ def _get_counter_name(self):
8489
process_queue_length = counters_dict['44']
8590
return f'"\\{system}\\{process_queue_length}"'
8691

87-
def close(self):
88-
if self.popen is None:
92+
def close(self, kill=True):
93+
if self._popen is None:
8994
return
90-
self.popen.kill()
91-
self.popen.wait()
92-
self.popen = None
95+
96+
self._load = None
97+
98+
if kill:
99+
self._popen.kill()
100+
self._popen.wait()
101+
self._popen = None
93102

94103
def __del__(self):
95104
self.close()
96105

97-
def read_output(self):
106+
def _parse_line(self, line):
107+
# typeperf outputs in a CSV format like this:
108+
# "07/19/2018 01:32:26.605","3.000000"
109+
# (date, process queue length)
110+
tokens = line.split(',')
111+
if len(tokens) != 2:
112+
raise ValueError
113+
114+
value = tokens[1]
115+
if not value.startswith('"') or not value.endswith('"'):
116+
raise ValueError
117+
value = value[1:-1]
118+
return float(value)
119+
120+
def _read_lines(self):
98121
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
99122
bytes_read, res = overlapped.GetOverlappedResult(False)
100123
if res != 0:
101-
return
124+
return ()
102125

103126
output = overlapped.getbuffer()
104-
return output.decode('oem', 'replace')
127+
output = output.decode('oem', 'replace')
128+
output = self._buffer + output
129+
lines = output.splitlines(True)
130+
131+
# bpo-36670: typeperf only writes a newline *before* writing a value,
132+
# not after. Sometimes, the written line in incomplete (ex: only
133+
# timestamp, without the process queue length). Only pass the last line
134+
# to the parser if it's a valid value, otherwise store it in
135+
# self._buffer.
136+
try:
137+
self._parse_line(lines[-1])
138+
except ValueError:
139+
self._buffer = lines.pop(-1)
140+
else:
141+
self._buffer = ''
142+
143+
return lines
105144

106145
def getloadavg(self):
107-
typeperf_output = self.read_output()
108-
# Nothing to update, just return the current load
109-
if not typeperf_output:
110-
return self.load
146+
if self._popen is None:
147+
return None
148+
149+
returncode = self._popen.poll()
150+
if returncode is not None:
151+
self.close(kill=False)
152+
return None
153+
154+
try:
155+
lines = self._read_lines()
156+
except BrokenPipeError:
157+
self.close()
158+
return None
159+
160+
for line in lines:
161+
line = line.rstrip()
111162

112-
# Process the backlog of load values
113-
for line in typeperf_output.splitlines():
114163
# Ignore the initial header:
115164
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
116-
if '\\\\' in line:
165+
if 'PDH-CSV' in line:
117166
continue
118167

119168
# Ignore blank lines
120-
if not line.strip():
169+
if not line:
121170
continue
122171

123-
# typeperf outputs in a CSV format like this:
124-
# "07/19/2018 01:32:26.605","3.000000"
125-
# (date, process queue length)
126172
try:
127-
tokens = line.split(',')
128-
if len(tokens) != 2:
129-
raise ValueError
130-
131-
value = tokens[1].replace('"', '')
132-
load = float(value)
173+
processor_queue_length = self._parse_line(line)
133174
except ValueError:
134175
print_warning("Failed to parse typeperf output: %a" % line)
135176
continue
136177

137178
# We use an exponentially weighted moving average, imitating the
138179
# load calculation on Unix systems.
139180
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
140-
new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
141-
self.load = new_load
142-
143-
return self.load
181+
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
182+
if self._load is not None:
183+
self._load = (self._load * LOAD_FACTOR_1
184+
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
185+
elif len(self._values) < NVALUE:
186+
self._values.append(processor_queue_length)
187+
else:
188+
self._load = sum(self._values) / len(self._values)
189+
190+
return self._load

Lib/test/test_regrtest.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
2626
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
2727
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
28+
LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
2829

2930
TEST_INTERRUPTED = textwrap.dedent("""
3031
from signal import SIGINT, raise_signal
@@ -390,8 +391,8 @@ def check_line(self, output, regex):
390391
self.assertRegex(output, regex)
391392

392393
def parse_executed_tests(self, output):
393-
regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
394-
% self.TESTNAME_REGEX)
394+
regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
395+
% (LOG_PREFIX, self.TESTNAME_REGEX))
395396
parser = re.finditer(regex, output, re.MULTILINE)
396397
return list(match.group(1) for match in parser)
397398

@@ -451,9 +452,10 @@ def list_regex(line_format, tests):
451452
if rerun:
452453
regex = list_regex('%s re-run test%s', rerun)
453454
self.check_line(output, regex)
454-
self.check_line(output, "Re-running failed tests in verbose mode")
455+
regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
456+
self.check_line(output, regex)
455457
for test_name in rerun:
456-
regex = f"Re-running {test_name} in verbose mode"
458+
regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode"
457459
self.check_line(output, regex)
458460

459461
if no_test_ran:
@@ -1202,9 +1204,9 @@ def test_format_duration(self):
12021204
self.assertEqual(utils.format_duration(10e-3),
12031205
'10 ms')
12041206
self.assertEqual(utils.format_duration(1.5),
1205-
'1 sec 500 ms')
1207+
'1.5 sec')
12061208
self.assertEqual(utils.format_duration(1),
1207-
'1 sec')
1209+
'1.0 sec')
12081210
self.assertEqual(utils.format_duration(2 * 60),
12091211
'2 min')
12101212
self.assertEqual(utils.format_duration(2 * 60 + 1),

0 commit comments

Comments
 (0)