1515 runtest , INTERRUPTED , CHILD_ERROR , PROGRESS_MIN_TIME ,
1616 format_test_result , TestResult , is_failed , TIMEOUT )
1717from test .libregrtest .setup import setup_tests
18- from test .libregrtest .utils import format_duration
18+ from test .libregrtest .utils import format_duration , print_warning
1919
2020
2121# Display the running tests if nothing happened last N seconds
@@ -103,9 +103,10 @@ class ExitThread(Exception):
103103 pass
104104
105105
106- class MultiprocessThread (threading .Thread ):
107- def __init__ (self , pending , output , ns , timeout ):
106+ class TestWorkerProcess (threading .Thread ):
107+ def __init__ (self , worker_id , pending , output , ns , timeout ):
108108 super ().__init__ ()
109+ self .worker_id = worker_id
109110 self .pending = pending
110111 self .output = output
111112 self .ns = ns
@@ -114,12 +115,16 @@ def __init__(self, pending, output, ns, timeout):
114115 self .start_time = None
115116 self ._popen = None
116117 self ._killed = False
118+ self ._stopped = False
117119
118120 def __repr__ (self ):
119- info = ['MultiprocessThread' ]
120- test = self .current_test_name
121+ info = [f'TestWorkerProcess #{ self .worker_id } ' ]
121122 if self .is_alive ():
122- info .append ('alive' )
123+ dt = time .monotonic () - self .start_time
124+ info .append ("running for %s" % format_duration (dt ))
125+ else :
126+ info .append ('stopped' )
127+ test = self .current_test_name
123128 if test :
124129 info .append (f'test={ test } ' )
125130 popen = self ._popen
@@ -128,53 +133,24 @@ def __repr__(self):
128133 return '<%s>' % ' ' .join (info )
129134
130135 def _kill (self ):
131- dt = time .monotonic () - self .start_time
136+ if self ._killed :
137+ return
138+ self ._killed = True
132139
133140 popen = self ._popen
134- pid = popen .pid
135- print ("Kill worker process %s running for %.1f sec" % (pid , dt ),
136- file = sys .stderr , flush = True )
141+ if popen is None :
142+ return
137143
144+ print (f"Kill { self } " , file = sys .stderr , flush = True )
138145 try :
139146 popen .kill ()
140- return True
141147 except OSError as exc :
142- print ("WARNING: Failed to kill worker process %s: %r" % (pid , exc ),
143- file = sys .stderr , flush = True )
144- return False
145-
146- def _close_wait (self ):
147- popen = self ._popen
148-
149- # stdout and stderr must be closed to ensure that communicate()
150- # does not hang
151- popen .stdout .close ()
152- popen .stderr .close ()
153-
154- try :
155- popen .wait (JOIN_TIMEOUT )
156- except (subprocess .TimeoutExpired , OSError ) as exc :
157- print ("WARNING: Failed to wait for worker process %s "
158- "completion (timeout=%.1f sec): %r"
159- % (popen .pid , JOIN_TIMEOUT , exc ),
160- file = sys .stderr , flush = True )
161-
162- def kill (self ):
163- """
164- Kill the current process (if any).
165-
166- This method can be called by the thread running the process,
167- or by another thread.
168- """
169- self ._killed = True
170-
171- if self ._popen is None :
172- return
173-
174- if not self ._kill ():
175- return
148+ print_warning (f"Failed to kill { self } : { exc !r} " )
176149
177- self ._close_wait ()
150+ def stop (self ):
151+ # Method called from a different thread to stop this thread
152+ self ._stopped = True
153+ self ._kill ()
178154
179155 def mp_result_error (self , test_name , error_type , stdout = '' , stderr = '' ,
180156 err_msg = None ):
@@ -190,59 +166,69 @@ def _timedout(self, test_name):
190166 try :
191167 stdout , stderr = popen .communicate (timeout = JOIN_TIMEOUT )
192168 except (subprocess .TimeoutExpired , OSError ) as exc :
193- print ("WARNING: Failed to read worker process %s output "
194- "(timeout=%.1f sec): %r"
195- % (popen .pid , JOIN_TIMEOUT , exc ),
196- file = sys .stderr , flush = True )
197-
198- self ._close_wait ()
169+ print_warning (f"Failed to read { self } output "
170+ f"(timeout={ format_duration (JOIN_TIMEOUT )} ): "
171+ f"{ exc !r} " )
199172
200173 return self .mp_result_error (test_name , TIMEOUT , stdout , stderr )
201174
202- def _runtest (self , test_name ):
203- try :
204- self .start_time = time .monotonic ()
205- self .current_test_name = test_name
175+ def _run_process (self , test_name ):
176+ self .start_time = time .monotonic ()
206177
178+ self .current_test_name = test_name
179+ try :
180+ self ._killed = False
207181 self ._popen = run_test_in_subprocess (test_name , self .ns )
208182 popen = self ._popen
183+ except :
184+ self .current_test_name = None
185+ raise
186+
187+ try :
188+ if self ._stopped :
189+ # If kill() has been called before self._popen is set,
190+ # self._popen is still running. Call again kill()
191+ # to ensure that the process is killed.
192+ self ._kill ()
193+ raise ExitThread
194+
209195 try :
210- try :
211- if self ._killed :
212- # If kill() has been called before self._popen is set,
213- # self._popen is still running. Call again kill()
214- # to ensure that the process is killed.
215- self .kill ()
216- raise ExitThread
217-
218- try :
219- stdout , stderr = popen .communicate (timeout = self .timeout )
220- except subprocess .TimeoutExpired :
221- if self ._killed :
222- # kill() has been called: communicate() fails
223- # on reading closed stdout/stderr
224- raise ExitThread
225-
226- return self ._timedout (test_name )
227- except OSError :
228- if self ._killed :
229- # kill() has been called: communicate() fails
230- # on reading closed stdout/stderr
231- raise ExitThread
232- raise
233- except :
234- self .kill ()
235- raise
236- finally :
237- self ._close_wait ()
196+ stdout , stderr = popen .communicate (timeout = self .timeout )
197+ except subprocess .TimeoutExpired :
198+ if self ._stopped :
199+ # kill() has been called: communicate() fails
200+ # on reading closed stdout/stderr
201+ raise ExitThread
202+
203+ return self ._timedout (test_name )
204+ except OSError :
205+ if self ._stopped :
206+ # kill() has been called: communicate() fails
207+ # on reading closed stdout/stderr
208+ raise ExitThread
209+ raise
238210
239211 retcode = popen .returncode
212+ stdout = stdout .strip ()
213+ stderr = stderr .rstrip ()
214+
215+ return (retcode , stdout , stderr )
216+ except :
217+ self ._kill ()
218+ raise
240219 finally :
241- self .current_test_name = None
220+ self ._wait_completed ()
242221 self ._popen = None
222+ self .current_test_name = None
223+
224+ def _runtest (self , test_name ):
225+ result = self ._run_process (test_name )
243226
244- stdout = stdout .strip ()
245- stderr = stderr .rstrip ()
227+ if isinstance (result , MultiprocessResult ):
228+ # _timedout() case
229+ return result
230+
231+ retcode , stdout , stderr = result
246232
247233 err_msg = None
248234 if retcode != 0 :
@@ -266,7 +252,7 @@ def _runtest(self, test_name):
266252 return MultiprocessResult (result , stdout , stderr , err_msg )
267253
268254 def run (self ):
269- while not self ._killed :
255+ while not self ._stopped :
270256 try :
271257 try :
272258 test_name = next (self .pending )
@@ -284,6 +270,33 @@ def run(self):
284270 self .output .put ((True , traceback .format_exc ()))
285271 break
286272
273+ def _wait_completed (self ):
274+ popen = self ._popen
275+
276+ # stdout and stderr must be closed to ensure that communicate()
277+ # does not hang
278+ popen .stdout .close ()
279+ popen .stderr .close ()
280+
281+ try :
282+ popen .wait (JOIN_TIMEOUT )
283+ except (subprocess .TimeoutExpired , OSError ) as exc :
284+ print_warning (f"Failed to wait for { self } completion "
285+ f"(timeout={ format_duration (JOIN_TIMEOUT )} ): "
286+ f"{ exc !r} " )
287+
288+ def wait_stopped (self , start_time ):
289+ while True :
290+ # Write a message every second
291+ self .join (1.0 )
292+ if not self .is_alive ():
293+ break
294+ dt = time .monotonic () - start_time
295+ print (f"Waiting for { self } thread for { format_duration (dt )} " , flush = True )
296+ if dt > JOIN_TIMEOUT :
297+ print_warning (f"Failed to join { self } in { format_duration (dt )} " )
298+ break
299+
287300
288301def get_running (workers ):
289302 running = []
@@ -298,7 +311,7 @@ def get_running(workers):
298311 return running
299312
300313
301- class MultiprocessRunner :
314+ class MultiprocessTestRunner :
302315 def __init__ (self , regrtest ):
303316 self .regrtest = regrtest
304317 self .ns = regrtest .ns
@@ -311,30 +324,20 @@ def __init__(self, regrtest):
311324 self .workers = None
312325
313326 def start_workers (self ):
314- self .workers = [MultiprocessThread ( self .pending , self .output ,
315- self .ns , self .worker_timeout )
316- for _ in range (self .ns .use_mp )]
327+ self .workers = [TestWorkerProcess ( index , self .pending , self .output ,
328+ self .ns , self .worker_timeout )
329+ for index in range (1 , self .ns .use_mp + 1 )]
317330 print ("Run tests in parallel using %s child processes"
318331 % len (self .workers ))
319332 for worker in self .workers :
320333 worker .start ()
321334
322- def wait_workers (self ):
335+ def stop_workers (self ):
323336 start_time = time .monotonic ()
324337 for worker in self .workers :
325- worker .kill ()
338+ worker .stop ()
326339 for worker in self .workers :
327- while True :
328- worker .join (1.0 )
329- if not worker .is_alive ():
330- break
331- dt = time .monotonic () - start_time
332- print ("Wait for regrtest worker %r for %.1f sec" % (worker , dt ),
333- flush = True )
334- if dt > JOIN_TIMEOUT :
335- print ("Warning -- failed to join a regrtest worker %s"
336- % worker , flush = True )
337- break
340+ worker .wait_stopped (start_time )
338341
339342 def _get_result (self ):
340343 if not any (worker .is_alive () for worker in self .workers ):
@@ -418,10 +421,11 @@ def run_tests(self):
418421 if self .ns .timeout is not None :
419422 faulthandler .cancel_dump_traceback_later ()
420423
421- # a test failed (and --failfast is set) or all tests completed
422- self .pending .stop ()
423- self .wait_workers ()
424+ # Always ensure that all worker processes are no longer
425+ # worker when we exit this function
426+ self .pending .stop ()
427+ self .stop_workers ()
424428
425429
426430def run_tests_multiprocess (regrtest ):
427- MultiprocessRunner (regrtest ).run_tests ()
431+ MultiprocessTestRunner (regrtest ).run_tests ()
0 commit comments