1414
1515package com .google .devtools .build .lib .worker ;
1616
17+
1718import com .google .common .annotations .VisibleForTesting ;
1819import com .google .common .collect .ImmutableList ;
19- import com .google .common .flogger .GoogleLogger ;
2020import com .google .devtools .build .lib .events .Event ;
2121import com .google .devtools .build .lib .events .EventHandler ;
2222import com .google .devtools .build .lib .shell .Subprocess ;
3131import java .util .ArrayList ;
3232import java .util .List ;
3333import java .util .Optional ;
34+ import java .util .concurrent .BlockingQueue ;
3435import java .util .concurrent .ConcurrentHashMap ;
3536import java .util .concurrent .ConcurrentMap ;
37+ import java .util .concurrent .LinkedBlockingQueue ;
3638import java .util .concurrent .Semaphore ;
3739import javax .annotation .Nullable ;
3840
4345 * into them to send requests. When a worker process returns a {@code WorkResponse}, {@code
4446 * WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response.
4547 */
46- public class WorkerMultiplexer extends Thread {
47- private static final GoogleLogger logger = GoogleLogger .forEnclosingClass ();
48+ public class WorkerMultiplexer {
49+ /**
50+ * A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link
51+ * WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and
52+ * send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code
53+ * stdin} of the worker process.
54+ */
55+ private final BlockingQueue <WorkRequest > pendingRequests = new LinkedBlockingQueue <>();
4856 /**
4957 * A map of {@code WorkResponse}s received from the worker process. They are stored in this map
5058 * keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
@@ -80,6 +88,12 @@ public class WorkerMultiplexer extends Thread {
8088 /** For testing only, allow a way to fake subprocesses. */
8189 private SubprocessFactory subprocessFactory ;
8290
91+ /** A separate thread that sends requests. */
92+ private Thread requestSender ;
93+
94+ /** A separate thread that receives responses. */
95+ private Thread responseReceiver ;
96+
8397 /**
8498 * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
8599 * at the end of a command execution.
@@ -97,16 +111,15 @@ synchronized void setReporter(@Nullable EventHandler reporter) {
97111 }
98112
99113 /** Reports a string to the user if reporting is enabled. */
100- private synchronized void report (String s ) {
101- EventHandler r = this .reporter ; // Protect against race condition with setReporter().
102- if (r != null && s != null ) {
103- r .handle (Event .info (s ));
114+ private synchronized void report (@ Nullable String s ) {
115+ if (this .reporter != null && s != null ) {
116+ this .reporter .handle (Event .info (s ));
104117 }
105118 }
106119
107120 /**
108121 * Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already
109- * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread .
122+ * exist. Also starts up the subthreads handling reading and writing requests and responses .
110123 */
111124 public synchronized void createProcess (Path workDir ) throws IOException {
112125 if (this .process == null ) {
@@ -129,12 +142,25 @@ public synchronized void createProcess(Path workDir) throws IOException {
129142 processBuilder .setStderr (logFile .getPathFile ());
130143 processBuilder .setEnv (workerKey .getEnv ());
131144 this .process = processBuilder .start ();
145+ String id = workerKey .getMnemonic () + "-" + workerKey .hashCode ();
146+ // TODO(larsrc): Consider moving sender/receiver threads into separate classes.
147+ this .requestSender =
148+ new Thread (
149+ () -> {
150+ while (process .isAlive () && sendRequest ()) {}
151+ });
152+ this .requestSender .setName ("multiplexer-request-sender-" + id );
153+ this .requestSender .start ();
154+ this .responseReceiver =
155+ new Thread (
156+ () -> {
157+ while (process .isAlive () && readResponse ()) {}
158+ });
159+ this .responseReceiver .setName ("multiplexer-response-receiver-" + id );
160+ this .responseReceiver .start ();
132161 } else if (!this .process .isAlive ()) {
133162 throw new IOException ("Process is dead" );
134163 }
135- if (!this .isAlive ()) {
136- this .start ();
137- }
138164 }
139165
140166 /**
@@ -157,7 +183,10 @@ public synchronized void destroyMultiplexer() {
157183 wasDestroyed = true ;
158184 }
159185
160- /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */
186+ /**
187+ * Destroys the worker subprocess. This might block forever if the subprocess refuses to die. It
188+ * is safe to call this multiple times.
189+ */
161190 private synchronized void destroyProcess () {
162191 boolean wasInterrupted = false ;
163192 try {
@@ -171,6 +200,17 @@ private synchronized void destroyProcess() {
171200 }
172201 }
173202 } finally {
203+ // Stop the subthreads only when the process is dead, or their loops will go on.
204+ if (this .requestSender != null ) {
205+ this .requestSender .interrupt ();
206+ }
207+ if (this .responseReceiver != null ) {
208+ this .responseReceiver .interrupt ();
209+ }
210+ // Might as well release any waiting workers
211+ for (Semaphore semaphore : responseChecker .values ()) {
212+ semaphore .release ();
213+ }
174214 // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/
175215 if (wasInterrupted ) {
176216 Thread .currentThread ().interrupt (); // preserve interrupted status
@@ -183,17 +223,12 @@ private synchronized void destroyProcess() {
183223 * WorkerProxy}, and so is subject to interrupts by dynamic execution.
184224 */
185225 public synchronized void putRequest (WorkRequest request ) throws IOException {
186- responseChecker .put (request .getRequestId (), new Semaphore (0 ));
187- try {
188- request .writeDelimitedTo (process .getOutputStream ());
189- process .getOutputStream ().flush ();
190- } catch (IOException e ) {
191- // We can't know how much of the request was sent, so we have to assume the worker's input
192- // now contains garbage.
193- // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread?
194- responseChecker .remove (request .getRequestId ());
195- throw e ;
226+ if (!process .isAlive ()) {
227+ throw new IOException (
228+ "Attempting to send request " + request .getRequestId () + " to dead process" );
196229 }
230+ responseChecker .put (request .getRequestId (), new Semaphore (0 ));
231+ pendingRequests .add (request );
197232 }
198233
199234 /**
@@ -203,58 +238,99 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
203238 */
204239 public WorkResponse getResponse (Integer requestId ) throws InterruptedException {
205240 try {
241+ if (!process .isAlive ()) {
242+ // If the process has died, all we can do is return what may already have been returned.
243+ return workerProcessResponse .get (requestId );
244+ }
245+
206246 Semaphore waitForResponse = responseChecker .get (requestId );
207247
208248 if (waitForResponse == null ) {
209249 report ("Null response semaphore for " + requestId );
210- // If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request,
211- // the request is not sent, so there is no need to wait for a response .
212- return null ;
250+ // If there is no semaphore for this request, it probably failed to send, so we just return
251+ // what we got, probably nothing .
252+ return workerProcessResponse . get ( requestId ) ;
213253 }
214254
215255 // Wait for the multiplexer to get our response and release this semaphore. The semaphore will
216256 // throw {@code InterruptedException} when the multiplexer is terminated.
217257 waitForResponse .acquire ();
218258
219- WorkResponse workResponse = workerProcessResponse .get (requestId );
220- return workResponse ;
259+ return workerProcessResponse .get (requestId );
221260 } finally {
222261 responseChecker .remove (requestId );
223262 workerProcessResponse .remove (requestId );
224263 }
225264 }
226265
227266 /**
228- * Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in
229- * {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}.
267+ * Sends a single pending request, if there are any. Blocks until a request is available.
230268 *
231- * <p>This is only called on the WorkerMultiplexer thread and so cannot be interrupted by dynamic
232- * execution cancellation.
269+ * <p>This is only called by the {@code requestSender} thread and so cannot be interrupted by
270+ * dynamic execution cancellation, but only by a call to {@link #destroyProcess()} .
233271 */
234- private void waitResponse () throws InterruptedException , IOException {
235- recordingStream = new RecordingInputStream (this .process .getInputStream ());
236- recordingStream .startRecording (4096 );
237- // TODO(larsrc): Turn this into a loop that also sends requests.
238- // Allow interrupts while waiting for responses, without conflating it with I/O errors.
239- while (recordingStream .available () == 0 ) {
240- if (!this .process .isAlive ()) {
241- throw new IOException (
242- String .format ("Multiplexer process for %s is dead" , workerKey .getMnemonic ()));
272+ private boolean sendRequest () {
273+ WorkRequest request ;
274+ try {
275+ request = pendingRequests .take ();
276+ } catch (InterruptedException e ) {
277+ return false ;
278+ }
279+ try {
280+ request .writeDelimitedTo (process .getOutputStream ());
281+ process .getOutputStream ().flush ();
282+ } catch (IOException e ) {
283+ // We can't know how much of the request was sent, so we have to assume the worker's input
284+ // now contains garbage, and this request is lost.
285+ // TODO(b/177637516): Signal that this action failed for presumably transient reasons.
286+ report ("Failed to send request " + request .getRequestId ());
287+ Semaphore s = responseChecker .remove (request .getRequestId ());
288+ if (s != null ) {
289+ s .release ();
243290 }
244- Thread .sleep (1 );
291+ // TODO(b/177637516): Leave process in a moribound state so pending responses can be returned.
292+ destroyProcess ();
293+ return false ;
245294 }
246- WorkResponse parsedResponse = WorkResponse .parseDelimitedFrom (recordingStream );
295+ return true ;
296+ }
247297
248- // A null parsedResponse can only happen if the input stream is closed, in which case we
298+ /**
299+ * Reads a {@code WorkResponse} from worker process, puts that {@code WorkResponse} in {@code
300+ * workerProcessResponse}, and releases the semaphore for the {@code WorkerProxy}.
301+ *
302+ * <p>This is only called on the readResponses subthread and so cannot be interrupted by dynamic
303+ * execution cancellation, but only by a call to {@link #destroyProcess()}.
304+ */
305+ private boolean readResponse () {
306+ recordingStream = new RecordingInputStream (process .getInputStream ());
307+ recordingStream .startRecording (4096 );
308+ WorkResponse parsedResponse ;
309+ try {
310+ parsedResponse = WorkResponse .parseDelimitedFrom (recordingStream );
311+ } catch (IOException e ) {
312+ if (!(e instanceof InterruptedIOException )) {
313+ report (
314+ String .format (
315+ "Error while reading response from multiplexer process for %s: %s" ,
316+ workerKey .getMnemonic (), e ));
317+ }
318+ // We can't know how much of the response was read, so we have to assume the worker's output
319+ // now contains garbage, and we can't reliably read any further responses.
320+ destroyProcess ();
321+ return false ;
322+ }
323+ // A null parsedResponse can happen if the input stream is closed, in which case we
249324 // drop everything.
250325 if (parsedResponse == null ) {
251- throw new IOException (
326+ report (
252327 String .format (
253- "Multiplexer process for %s died while reading response" , workerKey .getMnemonic ()));
328+ "Multiplexer process for %s has closed its output stream" , workerKey .getMnemonic ()));
329+ destroyProcess ();
330+ return false ;
254331 }
255332
256333 int requestId = parsedResponse .getRequestId ();
257-
258334 workerProcessResponse .put (requestId , parsedResponse );
259335
260336 // TODO(b/151767359): When allowing cancellation, just remove responses that have no matching
@@ -267,61 +343,7 @@ private void waitResponse() throws InterruptedException, IOException {
267343 report (String .format ("Multiplexer for %s found no semaphore" , workerKey .getMnemonic ()));
268344 workerProcessResponse .remove (requestId );
269345 }
270- }
271-
272- /** The multiplexer thread that listens to the WorkResponse from worker process. */
273- @ Override
274- public void run () {
275- while (this .process .isAlive ()) {
276- try {
277- waitResponse ();
278- } catch (IOException e ) {
279- // We got this exception while reading from the worker's stdout. We can't trust the
280- // output any more at that point.
281- if (this .process .isAlive ()) {
282- destroyProcess ();
283- }
284- if (e instanceof InterruptedIOException ) {
285- report (
286- String .format (
287- "Multiplexer process for %s was interrupted during I/O, aborting multiplexer" ,
288- workerKey .getMnemonic ()));
289- } else {
290- // TODO(larsrc): Output the recorded message.
291- report (
292- String .format (
293- "Multiplexer for %s got IOException reading a response, aborting multiplexer" ,
294- workerKey .getMnemonic ()));
295- logger .atWarning ().withCause (e ).log (
296- "Caught IOException while waiting for worker response. "
297- + "It could be because the worker returned an unparseable response." );
298- }
299- } catch (InterruptedException e ) {
300- // This should only happen when the Blaze build has been aborted (failed or cancelled). In
301- // that case, there may still be some outstanding requests in the worker process, which we
302- // will let fall on the floor, but we still want to leave the process running for the next
303- // build.
304- // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented.
305- for (Semaphore semaphore : responseChecker .values ()) {
306- semaphore .release ();
307- }
308- }
309- }
310- synchronized (this ) {
311- releaseAllSemaphores ();
312- }
313- }
314-
315- /**
316- * Release all the semaphores and clear the related maps. Must only be called when we are shutting
317- * down the multiplexer.
318- */
319- private void releaseAllSemaphores () {
320- for (Semaphore semaphore : responseChecker .values ()) {
321- semaphore .release ();
322- }
323- responseChecker .clear ();
324- workerProcessResponse .clear ();
346+ return true ;
325347 }
326348
327349 String getRecordingStreamMessage () {
0 commit comments