22
33import errno
44import io
5+ import itertools
56import os
67import selectors
78import signal
2930__all__ = (
3031 'SelectorEventLoop' ,
3132 'AbstractChildWatcher' , 'SafeChildWatcher' ,
32- 'FastChildWatcher' , 'DefaultEventLoopPolicy' ,
33+ 'FastChildWatcher' ,
34+ 'MultiLoopChildWatcher' , 'ThreadedChildWatcher' ,
35+ 'DefaultEventLoopPolicy' ,
3336)
3437
3538
@@ -184,6 +187,13 @@ async def _make_subprocess_transport(self, protocol, args, shell,
184187 stdin , stdout , stderr , bufsize ,
185188 extra = None , ** kwargs ):
186189 with events .get_child_watcher () as watcher :
190+ if not watcher .is_active ():
191+ # Check early.
192+ # Raising exception before process creation
193+ # prevents subprocess execution if the watcher
194+ # is not ready to handle it.
195+ raise RuntimeError ("asyncio.get_child_watcher() is not activated, "
196+ "subproccess support is not installed." )
187197 waiter = self .create_future ()
188198 transp = _UnixSubprocessTransport (self , protocol , args , shell ,
189199 stdin , stdout , stderr , bufsize ,
@@ -838,6 +848,15 @@ def close(self):
838848 """
839849 raise NotImplementedError ()
840850
851+ def is_active (self ):
852+ """Watcher status.
853+
854+ Return True if the watcher is installed and ready to handle process exit
855+ notifications.
856+
857+ """
858+ raise NotImplementedError ()
859+
841860 def __enter__ (self ):
842861 """Enter the watcher's context and allow starting new processes
843862
@@ -849,6 +868,20 @@ def __exit__(self, a, b, c):
849868 raise NotImplementedError ()
850869
851870
871+ def _compute_returncode (status ):
872+ if os .WIFSIGNALED (status ):
873+ # The child process died because of a signal.
874+ return - os .WTERMSIG (status )
875+ elif os .WIFEXITED (status ):
876+ # The child process exited (e.g sys.exit()).
877+ return os .WEXITSTATUS (status )
878+ else :
879+ # The child exited, but we don't understand its status.
880+ # This shouldn't happen, but if it does, let's just
881+ # return that status; perhaps that helps debug it.
882+ return status
883+
884+
852885class BaseChildWatcher (AbstractChildWatcher ):
853886
854887 def __init__ (self ):
@@ -858,6 +891,9 @@ def __init__(self):
858891 def close (self ):
859892 self .attach_loop (None )
860893
894+ def is_active (self ):
895+ return self ._loop is not None and self ._loop .is_running ()
896+
861897 def _do_waitpid (self , expected_pid ):
862898 raise NotImplementedError ()
863899
@@ -898,19 +934,6 @@ def _sig_chld(self):
898934 'exception' : exc ,
899935 })
900936
901- def _compute_returncode (self , status ):
902- if os .WIFSIGNALED (status ):
903- # The child process died because of a signal.
904- return - os .WTERMSIG (status )
905- elif os .WIFEXITED (status ):
906- # The child process exited (e.g sys.exit()).
907- return os .WEXITSTATUS (status )
908- else :
909- # The child exited, but we don't understand its status.
910- # This shouldn't happen, but if it does, let's just
911- # return that status; perhaps that helps debug it.
912- return status
913-
914937
915938class SafeChildWatcher (BaseChildWatcher ):
916939 """'Safe' child watcher implementation.
@@ -934,11 +957,6 @@ def __exit__(self, a, b, c):
934957 pass
935958
936959 def add_child_handler (self , pid , callback , * args ):
937- if self ._loop is None :
938- raise RuntimeError (
939- "Cannot add child handler, "
940- "the child watcher does not have a loop attached" )
941-
942960 self ._callbacks [pid ] = (callback , args )
943961
944962 # Prevent a race condition in case the child is already terminated.
@@ -974,7 +992,7 @@ def _do_waitpid(self, expected_pid):
974992 # The child process is still alive.
975993 return
976994
977- returncode = self . _compute_returncode (status )
995+ returncode = _compute_returncode (status )
978996 if self ._loop .get_debug ():
979997 logger .debug ('process %s exited with returncode %s' ,
980998 expected_pid , returncode )
@@ -1035,11 +1053,6 @@ def __exit__(self, a, b, c):
10351053 def add_child_handler (self , pid , callback , * args ):
10361054 assert self ._forks , "Must use the context manager"
10371055
1038- if self ._loop is None :
1039- raise RuntimeError (
1040- "Cannot add child handler, "
1041- "the child watcher does not have a loop attached" )
1042-
10431056 with self ._lock :
10441057 try :
10451058 returncode = self ._zombies .pop (pid )
@@ -1072,7 +1085,7 @@ def _do_waitpid_all(self):
10721085 # A child process is still alive.
10731086 return
10741087
1075- returncode = self . _compute_returncode (status )
1088+ returncode = _compute_returncode (status )
10761089
10771090 with self ._lock :
10781091 try :
@@ -1101,6 +1114,177 @@ def _do_waitpid_all(self):
11011114 callback (pid , returncode , * args )
11021115
11031116
1117+ class MultiLoopChildWatcher (AbstractChildWatcher ):
1118+ # The class keeps compatibility with AbstractChildWatcher ABC
1119+ # To achieve this it has empty attach_loop() method
1120+ # and doesn't accept explicit loop argument
1121+ # for add_child_handler()/remove_child_handler()
1122+ # but retrieves the current loop by get_running_loop()
1123+
1124+ def __init__ (self ):
1125+ self ._callbacks = {}
1126+ self ._saved_sighandler = None
1127+
1128+ def is_active (self ):
1129+ return self ._saved_sighandler is not None
1130+
1131+ def close (self ):
1132+ self ._callbacks .clear ()
1133+ if self ._saved_sighandler is not None :
1134+ handler = signal .getsignal (signal .SIGCHLD )
1135+ if handler != self ._sig_chld :
1136+ logger .warning ("SIGCHLD handler was changed by outside code" )
1137+ else :
1138+ signal .signal (signal .SIGCHLD , self ._saved_sighandler )
1139+ self ._saved_sighandler = None
1140+
1141+ def __enter__ (self ):
1142+ return self
1143+
1144+ def __exit__ (self , exc_type , exc_val , exc_tb ):
1145+ pass
1146+
1147+ def add_child_handler (self , pid , callback , * args ):
1148+ loop = events .get_running_loop ()
1149+ self ._callbacks [pid ] = (loop , callback , args )
1150+
1151+ # Prevent a race condition in case the child is already terminated.
1152+ self ._do_waitpid (pid )
1153+
1154+ def remove_child_handler (self , pid ):
1155+ try :
1156+ del self ._callbacks [pid ]
1157+ return True
1158+ except KeyError :
1159+ return False
1160+
1161+ def attach_loop (self , loop ):
1162+ # Don't save the loop but initialize itself if called first time
1163+ # The reason to do it here is that attach_loop() is called from
1164+ # unix policy only for the main thread.
1165+ # Main thread is required for subscription on SIGCHLD signal
1166+ if self ._saved_sighandler is None :
1167+ self ._saved_sighandler = signal .signal (signal .SIGCHLD , self ._sig_chld )
1168+ if self ._saved_sighandler is None :
1169+ logger .warning ("Previous SIGCHLD handler was set by non-Python code, "
1170+ "restore to default handler on watcher close." )
1171+ self ._saved_sighandler = signal .SIG_DFL
1172+
1173+ # Set SA_RESTART to limit EINTR occurrences.
1174+ signal .siginterrupt (signal .SIGCHLD , False )
1175+
1176+ def _do_waitpid_all (self ):
1177+ for pid in list (self ._callbacks ):
1178+ self ._do_waitpid (pid )
1179+
1180+ def _do_waitpid (self , expected_pid ):
1181+ assert expected_pid > 0
1182+
1183+ try :
1184+ pid , status = os .waitpid (expected_pid , os .WNOHANG )
1185+ except ChildProcessError :
1186+ # The child process is already reaped
1187+ # (may happen if waitpid() is called elsewhere).
1188+ pid = expected_pid
1189+ returncode = 255
1190+ logger .warning (
1191+ "Unknown child process pid %d, will report returncode 255" ,
1192+ pid )
1193+ debug_log = False
1194+ else :
1195+ if pid == 0 :
1196+ # The child process is still alive.
1197+ return
1198+
1199+ returncode = _compute_returncode (status )
1200+ debug_log = True
1201+ try :
1202+ loop , callback , args = self ._callbacks .pop (pid )
1203+ except KeyError : # pragma: no cover
1204+ # May happen if .remove_child_handler() is called
1205+ # after os.waitpid() returns.
1206+ logger .warning ("Child watcher got an unexpected pid: %r" ,
1207+ pid , exc_info = True )
1208+ else :
1209+ if loop .is_closed ():
1210+ logger .warning ("Loop %r that handles pid %r is closed" , loop , pid )
1211+ else :
1212+ if debug_log and loop .get_debug ():
1213+ logger .debug ('process %s exited with returncode %s' ,
1214+ expected_pid , returncode )
1215+ loop .call_soon_threadsafe (callback , pid , returncode , * args )
1216+
1217+ def _sig_chld (self , signum , frame ):
1218+ try :
1219+ self ._do_waitpid_all ()
1220+ except (SystemExit , KeyboardInterrupt ):
1221+ raise
1222+ except BaseException :
1223+ logger .warning ('Unknown exception in SIGCHLD handler' , exc_info = True )
1224+
1225+
1226+ class ThreadedChildWatcher (AbstractChildWatcher ):
1227+ # The watcher uses a thread per process
1228+ # for waiting for the process finish.
1229+ # It doesn't require subscription on POSIX signal
1230+
1231+ def __init__ (self ):
1232+ self ._pid_counter = itertools .count (0 )
1233+
1234+ def is_active (self ):
1235+ return True
1236+
1237+ def close (self ):
1238+ pass
1239+
1240+ def __enter__ (self ):
1241+ return self
1242+
1243+ def __exit__ (self , exc_type , exc_val , exc_tb ):
1244+ pass
1245+
1246+ def add_child_handler (self , pid , callback , * args ):
1247+ loop = events .get_running_loop ()
1248+ thread = threading .Thread (target = self ._do_waitpid ,
1249+ name = f"waitpid-{ next (self ._pid_counter )} " ,
1250+ args = (loop , pid , callback , args ),
1251+ daemon = True )
1252+ thread .start ()
1253+
1254+ def remove_child_handler (self , pid ):
1255+ # asyncio never calls remove_child_handler() !!!
1256+ # The method is no-op but is implemented because
1257+ # abstract base classe requires it
1258+ return True
1259+
1260+ def attach_loop (self , loop ):
1261+ pass
1262+
1263+ def _do_waitpid (self , loop , expected_pid , callback , args ):
1264+ assert expected_pid > 0
1265+
1266+ try :
1267+ pid , status = os .waitpid (expected_pid , 0 )
1268+ except ChildProcessError :
1269+ # The child process is already reaped
1270+ # (may happen if waitpid() is called elsewhere).
1271+ pid = expected_pid
1272+ returncode = 255
1273+ logger .warning (
1274+ "Unknown child process pid %d, will report returncode 255" ,
1275+ pid )
1276+ else :
1277+ returncode = _compute_returncode (status )
1278+ if loop .get_debug ():
1279+ logger .debug ('process %s exited with returncode %s' ,
1280+ expected_pid , returncode )
1281+
1282+ if loop .is_closed ():
1283+ logger .warning ("Loop %r that handles pid %r is closed" , loop , pid )
1284+ else :
1285+ loop .call_soon_threadsafe (callback , pid , returncode , * args )
1286+
1287+
11041288class _UnixDefaultEventLoopPolicy (events .BaseDefaultEventLoopPolicy ):
11051289 """UNIX event loop policy with a watcher for child processes."""
11061290 _loop_factory = _UnixSelectorEventLoop
@@ -1112,7 +1296,7 @@ def __init__(self):
11121296 def _init_watcher (self ):
11131297 with events ._lock :
11141298 if self ._watcher is None : # pragma: no branch
1115- self ._watcher = SafeChildWatcher ()
1299+ self ._watcher = ThreadedChildWatcher ()
11161300 if isinstance (threading .current_thread (),
11171301 threading ._MainThread ):
11181302 self ._watcher .attach_loop (self ._local ._loop )
@@ -1134,7 +1318,7 @@ def set_event_loop(self, loop):
11341318 def get_child_watcher (self ):
11351319 """Get the watcher for child processes.
11361320
1137- If not yet set, a SafeChildWatcher object is automatically created.
1321+ If not yet set, a ThreadedChildWatcher object is automatically created.
11381322 """
11391323 if self ._watcher is None :
11401324 self ._init_watcher ()
0 commit comments