3434import java .util .concurrent .ConcurrentHashMap ;
3535import java .util .concurrent .ConcurrentMap ;
3636import java .util .concurrent .Semaphore ;
37+ import javax .annotation .Nullable ;
3738
3839/**
3940 * An intermediate worker that sends requests and receives responses from the worker processes.
@@ -48,29 +49,23 @@ public class WorkerMultiplexer extends Thread {
4849 * A map of {@code WorkResponse}s received from the worker process. They are stored in this map
4950 * keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
5051 */
51- private final ConcurrentMap <Integer , WorkResponse > workerProcessResponse ;
52+ private final ConcurrentMap <Integer , WorkResponse > workerProcessResponse =
53+ new ConcurrentHashMap <>();
5254 /**
5355 * A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code
5456 * WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code
5557 * WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal
5658 * {@code WorkerProxy} that the {@code WorkerResponse} has been received.
5759 */
58- private final ConcurrentMap <Integer , Semaphore > responseChecker ;
59- /** The worker process that this WorkerMultiplexer should be talking to. */
60- private Subprocess process ;
60+ private final ConcurrentMap <Integer , Semaphore > responseChecker = new ConcurrentHashMap <>();
6161 /**
62- * Set to true if one of the worker processes returns an unparseable response, or for other
63- * reasons we can't properly handle the remaining responses. We then discard all the responses
64- * from other work requests and abort .
62+ * The worker process that this WorkerMultiplexer should be talking to. This should only be set
63+ * once, when creating a new process. If the process dies or its stdio streams get corrupted, the
64+ * {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed .
6565 */
66- private boolean isWorkerStreamCorrupted ;
66+ private Subprocess process ;
6767 /** InputStream from the worker process. */
6868 private RecordingInputStream recordingStream ;
69- /**
70- * True if we have received EOF on the stream from the worker process. We then stop processing,
71- * and all workers still waiting for responses will fail.
72- */
73- private boolean isWorkerStreamClosed ;
7469 /** True if this multiplexer was explicitly destroyed. */
7570 private boolean wasDestroyed ;
7671 /**
@@ -89,25 +84,20 @@ public class WorkerMultiplexer extends Thread {
8984 * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
9085 * at the end of a command execution.
9186 */
92- public EventHandler reporter ;
87+ private EventHandler reporter ;
9388
9489 WorkerMultiplexer (Path logFile , WorkerKey workerKey ) {
9590 this .logFile = logFile ;
9691 this .workerKey = workerKey ;
97- responseChecker = new ConcurrentHashMap <>();
98- workerProcessResponse = new ConcurrentHashMap <>();
99- isWorkerStreamCorrupted = false ;
100- isWorkerStreamClosed = false ;
101- wasDestroyed = false ;
10292 }
10393
10494 /** Sets or clears the reporter for outputting verbose info. */
105- void setReporter (EventHandler reporter ) {
95+ synchronized void setReporter (@ Nullable EventHandler reporter ) {
10696 this .reporter = reporter ;
10797 }
10898
10999 /** Reports a string to the user if reporting is enabled. */
110- private void report (String s ) {
100+ private synchronized void report (String s ) {
111101 EventHandler r = this .reporter ; // Protect against race condition with setReporter().
112102 if (r != null && s != null ) {
113103 r .handle (Event .info (s ));
@@ -119,17 +109,17 @@ private void report(String s) {
119109 * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread.
120110 */
121111 public synchronized void createProcess (Path workDir ) throws IOException {
122- // The process may have died in the meanwhile (e.g. between builds).
123- if (this .process == null || !this .process .isAlive ()) {
112+ if (this .process == null ) {
113+ if (this .wasDestroyed ) {
114+ throw new IOException ("Multiplexer destroyed before created process" );
115+ }
124116 ImmutableList <String > args = workerKey .getArgs ();
125117 File executable = new File (args .get (0 ));
126118 if (!executable .isAbsolute () && executable .getParent () != null ) {
127119 List <String > newArgs = new ArrayList <>(args );
128120 newArgs .set (0 , new File (workDir .getPathFile (), newArgs .get (0 )).getAbsolutePath ());
129121 args = ImmutableList .copyOf (newArgs );
130122 }
131- isWorkerStreamCorrupted = false ;
132- isWorkerStreamClosed = false ;
133123 SubprocessBuilder processBuilder =
134124 subprocessFactory != null
135125 ? new SubprocessBuilder (subprocessFactory )
@@ -139,6 +129,8 @@ public synchronized void createProcess(Path workDir) throws IOException {
139129 processBuilder .setStderr (logFile .getPathFile ());
140130 processBuilder .setEnv (workerKey .getEnv ());
141131 this .process = processBuilder .start ();
132+ } else if (!this .process .isAlive ()) {
133+ throw new IOException ("Process is dead" );
142134 }
143135 if (!this .isAlive ()) {
144136 this .start ();
@@ -155,24 +147,24 @@ public Path getLogFile() {
155147
156148 /**
157149 * Signals this object to destroy itself, including the worker process. The object might not be
158- * fully destroyed at the end of this call, but will terminate soon.
150+ * fully destroyed at the end of this call, but will terminate soon. This is considered a
151+ * deliberate destruction.
159152 */
160153 public synchronized void destroyMultiplexer () {
161154 if (this .process != null ) {
162- destroyProcess (this .process );
163- this .process = null ;
155+ destroyProcess ();
164156 }
165157 wasDestroyed = true ;
166158 }
167159
168160 /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */
169- private void destroyProcess (Subprocess process ) {
161+ private synchronized void destroyProcess () {
170162 boolean wasInterrupted = false ;
171163 try {
172- process .destroy ();
164+ this . process .destroy ();
173165 while (true ) {
174166 try {
175- process .waitFor ();
167+ this . process .waitFor ();
176168 return ;
177169 } catch (InterruptedException ie ) {
178170 wasInterrupted = true ;
@@ -183,7 +175,6 @@ private void destroyProcess(Subprocess process) {
183175 if (wasInterrupted ) {
184176 Thread .currentThread ().interrupt (); // preserve interrupted status
185177 }
186- isWorkerStreamClosed = true ;
187178 }
188179 }
189180
@@ -200,10 +191,6 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
200191 // We can't know how much of the request was sent, so we have to assume the worker's input
201192 // now contains garbage.
202193 // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread?
203- isWorkerStreamCorrupted = true ;
204- if (e instanceof InterruptedIOException ) {
205- Thread .currentThread ().interrupt ();
206- }
207194 responseChecker .remove (request .getRequestId ());
208195 throw e ;
209196 }
@@ -228,10 +215,8 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException {
228215 // Wait for the multiplexer to get our response and release this semaphore. The semaphore will
229216 // throw {@code InterruptedException} when the multiplexer is terminated.
230217 waitForResponse .acquire ();
231- report ("Acquired response semaphore for " + requestId );
232218
233219 WorkResponse workResponse = workerProcessResponse .get (requestId );
234- report ("Response for " + requestId + " is " + workResponse );
235220 return workResponse ;
236221 } finally {
237222 responseChecker .remove (requestId );
@@ -247,25 +232,25 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException {
247232 * execution cancellation.
248233 */
249234 private void waitResponse () throws InterruptedException , IOException {
250- Subprocess p = this .process ;
251- if (p == null || !p .isAlive ()) {
252- // Avoid busy-wait for a new process.
235+ recordingStream = new RecordingInputStream (this .process .getInputStream ());
236+ recordingStream .startRecording (4096 );
237+ // TODO(larsrc): Turn this into a loop that also sends requests.
238+ // Allow interrupts while waiting for responses, without conflating it with I/O errors.
239+ while (recordingStream .available () == 0 ) {
240+ if (!this .process .isAlive ()) {
241+ throw new IOException (
242+ String .format ("Multiplexer process for %s is dead" , workerKey .getMnemonic ()));
243+ }
253244 Thread .sleep (1 );
254- return ;
255245 }
256- recordingStream = new RecordingInputStream (p .getInputStream ());
257- recordingStream .startRecording (4096 );
258246 WorkResponse parsedResponse = WorkResponse .parseDelimitedFrom (recordingStream );
259247
260248 // A null parsedResponse can only happen if the input stream is closed, in which case we
261249 // drop everything.
262250 if (parsedResponse == null ) {
263- isWorkerStreamClosed = true ;
264- report (
251+ throw new IOException (
265252 String .format (
266- "Multiplexer process for %s has closed its output, aborting multiplexer" ,
267- workerKey .getMnemonic ()));
268- return ;
253+ "Multiplexer process for %s died while reading response" , workerKey .getMnemonic ()));
269254 }
270255
271256 int requestId = parsedResponse .getRequestId ();
@@ -287,13 +272,15 @@ private void waitResponse() throws InterruptedException, IOException {
287272 /** The multiplexer thread that listens to the WorkResponse from worker process. */
288273 @ Override
289274 public void run () {
290- while (! isWorkerStreamClosed && ! isWorkerStreamCorrupted ) {
275+ while (this . process . isAlive () ) {
291276 try {
292277 waitResponse ();
293278 } catch (IOException e ) {
294279 // We got this exception while reading from the worker's stdout. We can't trust the
295280 // output any more at that point.
296- isWorkerStreamCorrupted = true ;
281+ if (this .process .isAlive ()) {
282+ destroyProcess ();
283+ }
297284 if (e instanceof InterruptedIOException ) {
298285 report (
299286 String .format (
@@ -315,17 +302,12 @@ public void run() {
315302 // will let fall on the floor, but we still want to leave the process running for the next
316303 // build.
317304 // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented.
318- releaseAllSemaphores ();
305+ for (Semaphore semaphore : responseChecker .values ()) {
306+ semaphore .release ();
307+ }
319308 }
320309 }
321- // If we get here, the worker process is either dead or corrupted. We could attempt to restart
322- // it, but the outstanding requests will have failed already. Until we have a way to signal
323- // transient failures, we have to just reject all further requests and make sure the process
324- // is really dead
325310 synchronized (this ) {
326- if (process != null && process .isAlive ()) {
327- destroyMultiplexer ();
328- }
329311 releaseAllSemaphores ();
330312 }
331313 }
@@ -350,14 +332,14 @@ String getRecordingStreamMessage() {
350332
351333 /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */
352334 boolean diedUnexpectedly () {
353- Subprocess p = this .process ; // Protects against this.process getting null.
354- return p != null && !p .isAlive () && !wasDestroyed ;
335+ return this .process != null && !this .process .isAlive () && !wasDestroyed ;
355336 }
356337
357338 /** Returns the exit value of multiplexer's process, if it has exited. */
358339 Optional <Integer > getExitValue () {
359- Subprocess p = this .process ; // Protects against this.process getting null.
360- return p != null && !p .isAlive () ? Optional .of (p .exitValue ()) : Optional .empty ();
340+ return this .process != null && !this .process .isAlive ()
341+ ? Optional .of (this .process .exitValue ())
342+ : Optional .empty ();
361343 }
362344
363345 /** For testing only, to verify that maps are cleared after responses are reaped. */
0 commit comments