|
1 | 1 | # standard imports |
2 | 2 | import argparse |
3 | 3 | import os |
4 | | - |
5 | 4 | import select |
6 | 5 | import shutil |
7 | 6 | import subprocess |
8 | 7 | import sys |
9 | | -from typing import Optional, Mapping |
| 8 | +from typing import AnyStr, IO, Optional, Mapping |
10 | 9 |
|
11 | 10 | # lib imports |
12 | 11 | from dotenv import load_dotenv |
@@ -40,47 +39,86 @@ def _parse_args(args_list: list) -> argparse.Namespace: |
40 | 39 | return parser.parse_args(args_list) |
41 | 40 |
|
42 | 41 |
|
43 | | -def _run_subprocess( |
44 | | - args_list: list, |
45 | | - cwd: Optional[str] = None, |
46 | | - env: Optional[Mapping] = None, |
47 | | - ignore_error: bool = False, |
48 | | -) -> bool: |
49 | | - global ERROR |
50 | | - if cwd: |
51 | | - os.chdir(cwd) # hack for unit testing on windows |
52 | | - process = subprocess.Popen( |
53 | | - args=args_list, |
54 | | - stdout=subprocess.PIPE, |
55 | | - stderr=subprocess.PIPE, |
56 | | - cwd=cwd, |
57 | | - env=env, |
58 | | - ) |
| 42 | +def _read_and_print_output( |
| 43 | + pipe: Optional[IO[AnyStr]], |
| 44 | + encoding: str = 'utf-8', |
| 45 | +): |
| 46 | + """Read a line from a pipe and print it if not empty.""" |
| 47 | + line = pipe.readline() |
| 48 | + if line: |
| 49 | + print(line.decode(encoding), end='') |
59 | 50 |
|
60 | | - if cwd: |
61 | | - os.chdir(og_dir) |
62 | 51 |
|
63 | | - # Print stdout and stderr in real-time |
| 52 | +def _handle_realtime_output(process: subprocess.Popen): |
| 53 | + """Handle real-time output from process using select.""" |
64 | 54 | while True: |
65 | 55 | reads = [process.stdout.fileno(), process.stderr.fileno()] |
66 | 56 | ret = select.select(reads, [], []) |
67 | 57 |
|
68 | 58 | for fd in ret[0]: |
69 | 59 | if fd == process.stdout.fileno(): |
70 | | - read = process.stdout.readline() |
71 | | - print(read.decode('utf-8'), end='') |
72 | | - if fd == process.stderr.fileno(): |
73 | | - read = process.stderr.readline() |
74 | | - print(read.decode('utf-8'), end='') |
| 60 | + _read_and_print_output(pipe=process.stdout) |
| 61 | + elif fd == process.stderr.fileno(): |
| 62 | + _read_and_print_output(pipe=process.stderr) |
75 | 63 |
|
76 | 64 | if process.poll() is not None: |
77 | 65 | break |
78 | 66 |
|
79 | | - # close the file descriptors |
| 67 | + |
| 68 | +def _drain_remaining_output(process: subprocess.Popen): |
| 69 | + """Read and print any remaining output after process completion.""" |
| 70 | + # Drain stdout |
| 71 | + while True: |
| 72 | + stdout_line = process.stdout.readline() |
| 73 | + if not stdout_line: |
| 74 | + break |
| 75 | + print(stdout_line.decode('utf-8'), end='') |
| 76 | + |
| 77 | + # Drain stderr |
| 78 | + while True: |
| 79 | + stderr_line = process.stderr.readline() |
| 80 | + if not stderr_line: |
| 81 | + break |
| 82 | + print(stderr_line.decode('utf-8'), end='') |
| 83 | + |
| 84 | + |
| 85 | +def _cleanup_process(process: subprocess.Popen) -> int: |
| 86 | + """Close file descriptors and get exit code.""" |
80 | 87 | process.stdout.close() |
81 | 88 | process.stderr.close() |
| 89 | + return process.wait() |
| 90 | + |
| 91 | + |
| 92 | +def _setup_process(args_list: list, cwd: Optional[str], env: Optional[Mapping]) -> subprocess.Popen: |
| 93 | + """Set up and start the subprocess with proper working directory handling.""" |
| 94 | + if cwd: |
| 95 | + os.chdir(cwd) # hack for unit testing on windows |
| 96 | + |
| 97 | + process = subprocess.Popen( |
| 98 | + args=args_list, |
| 99 | + stdout=subprocess.PIPE, |
| 100 | + stderr=subprocess.PIPE, |
| 101 | + cwd=cwd, |
| 102 | + env=env, |
| 103 | + ) |
82 | 104 |
|
83 | | - exit_code = process.wait() |
| 105 | + if cwd: |
| 106 | + os.chdir(og_dir) |
| 107 | + |
| 108 | + return process |
| 109 | + |
| 110 | + |
| 111 | +def _run_subprocess( |
| 112 | + args_list: list, |
| 113 | + cwd: Optional[str] = None, |
| 114 | + env: Optional[Mapping] = None, |
| 115 | + ignore_error: bool = False, |
| 116 | +) -> bool: |
| 117 | + global ERROR |
| 118 | + process = _setup_process(args_list=args_list, cwd=cwd, env=env) |
| 119 | + _handle_realtime_output(process=process) |
| 120 | + _drain_remaining_output(process=process) |
| 121 | + exit_code = _cleanup_process(process=process) |
84 | 122 |
|
85 | 123 | if exit_code == 0: |
86 | 124 | return True |
|
0 commit comments