2424import java .io .StringWriter ;
2525import java .lang .management .ManagementFactory ;
2626import java .time .Duration ;
27+ import java .util .ArrayDeque ;
2728import java .util .List ;
29+ import java .util .Map ;
30+ import java .util .Optional ;
31+ import java .util .Queue ;
32+ import java .util .concurrent .ConcurrentHashMap ;
2833import java .util .concurrent .atomic .AtomicReference ;
2934import java .util .function .BiFunction ;
3035
3439 * (https://docs.bazel.build/versions/master/multiplex-worker.html).
3540 */
3641public class WorkRequestHandler implements AutoCloseable {
37-
3842 /** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
3943 public interface WorkerMessageProcessor {
4044 /** Reads the next incoming request from this worker's stdin. */
41- public WorkRequest readWorkRequest () throws IOException ;
45+ WorkRequest readWorkRequest () throws IOException ;
4246
4347 /**
4448 * Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
4549 * responsible for flushing the stdout.
4650 */
47- public void writeWorkResponse (WorkResponse workResponse ) throws IOException ;
51+ void writeWorkResponse (WorkResponse workResponse ) throws IOException ;
4852
4953 /** Clean up. */
50- public void close () throws IOException ;
54+ void close () throws IOException ;
55+ }
56+
57+ /** Holds information necessary to properly handle a request, especially for cancellation. */
58+ static class RequestInfo {
59+ /**
60+ * The builder for the response to this request. Since only one response must be sent per
61+ * request, this builder must be accessed through takeBuilder(), which zeroes this field and
62+ * returns the builder.
63+ */
64+ private WorkResponse .Builder responseBuilder = WorkResponse .newBuilder ();
65+
66+ /**
67+ * Returns the response builder. If called more than once on the same instance, subsequent calls
68+ * will return {@code null}.
69+ */
70+ synchronized Optional <WorkResponse .Builder > takeBuilder () {
71+ WorkResponse .Builder b = responseBuilder ;
72+ responseBuilder = null ;
73+ return Optional .ofNullable (b );
74+ }
5175 }
5276
77+ /** Requests that are currently being processed. Visible for testing. */
78+ final Map <Integer , RequestInfo > activeRequests = new ConcurrentHashMap <>();
79+
80+ /** WorkRequests that have been received but could not be processed yet. */
81+ private final Queue <WorkRequest > availableRequests = new ArrayDeque <>();
82+
5383 /** The function to be called after each {@link WorkRequest} is read. */
5484 private final BiFunction <List <String >, PrintWriter , Integer > callback ;
5585
@@ -58,6 +88,7 @@ public interface WorkerMessageProcessor {
5888
5989 final WorkerMessageProcessor messageProcessor ;
6090
91+
6192 private final CpuTimeBasedGcScheduler gcScheduler ;
6293
6394 /**
@@ -160,34 +191,61 @@ public void processRequests() throws IOException {
160191 if (request == null ) {
161192 break ;
162193 }
163- if (request .getRequestId () != 0 ) {
164- Thread t = createResponseThread (request );
165- t .start ();
166- } else {
167- respondToRequest (request );
194+ availableRequests .add (request );
195+ startRequestThreads ();
196+ }
197+ }
198+
199+ /**
200+ * Starts threads for as many outstanding requests as possible. This is the only method that adds
201+ * to {@code activeRequests}.
202+ */
203+ private synchronized void startRequestThreads () {
204+ while (!availableRequests .isEmpty ()) {
205+ // If there's a singleplex request in process, don't start more processes.
206+ if (activeRequests .containsKey (0 )) {
207+ return ;
168208 }
209+ WorkRequest request = availableRequests .peek ();
210+ // Don't start new singleplex requests if there are other requests running.
211+ if (request .getRequestId () == 0 && !activeRequests .isEmpty ()) {
212+ return ;
213+ }
214+ availableRequests .remove ();
215+ Thread t = createResponseThread (request );
216+ activeRequests .put (request .getRequestId (), new RequestInfo ());
217+ t .start ();
169218 }
170219 }
171220
172221 /** Creates a new {@link Thread} to process a multiplex request. */
173- public Thread createResponseThread (WorkRequest request ) {
222+ Thread createResponseThread (WorkRequest request ) {
174223 Thread currentThread = Thread .currentThread ();
224+ String threadName =
225+ request .getRequestId () > 0
226+ ? "multiplex-request-" + request .getRequestId ()
227+ : "singleplex-request" ;
175228 return new Thread (
176229 () -> {
230+ RequestInfo requestInfo = activeRequests .get (request .getRequestId ());
177231 try {
178- respondToRequest (request );
232+ respondToRequest (request , requestInfo );
179233 } catch (IOException e ) {
180234 e .printStackTrace (stderr );
181235 // In case of error, shut down the entire worker.
182236 currentThread .interrupt ();
237+ } finally {
238+ activeRequests .remove (request .getRequestId ());
239+ // A good time to start more requests, especially if we finished a singleplex request
240+ startRequestThreads ();
183241 }
184242 },
185- "multiplex-request-" + request . getRequestId () );
243+ threadName );
186244 }
187245
188246 /** Handles and responds to the given {@link WorkRequest}. */
189247 @ VisibleForTesting
190- void respondToRequest (WorkRequest request ) throws IOException {
248+ void respondToRequest (WorkRequest request , RequestInfo requestInfo ) throws IOException {
191249 try (StringWriter sw = new StringWriter ();
192250 PrintWriter pw = new PrintWriter (sw )) {
193251 int exitCode ;
@@ -198,14 +256,15 @@ void respondToRequest(WorkRequest request) throws IOException {
198256 exitCode = 1 ;
199257 }
200258 pw .flush ();
201- WorkResponse workResponse =
202- WorkResponse .newBuilder ()
203- .setOutput (sw .toString ())
204- .setExitCode (exitCode )
205- .setRequestId (request .getRequestId ())
206- .build ();
207- synchronized (this ) {
208- messageProcessor .writeWorkResponse (workResponse );
259+ Optional <WorkResponse .Builder > optBuilder = requestInfo .takeBuilder ();
260+ if (optBuilder .isPresent ()) {
261+ WorkResponse .Builder builder = optBuilder .get ();
262+ builder .setRequestId (request .getRequestId ());
263+ builder .setOutput (builder .getOutput () + sw .toString ()).setExitCode (exitCode );
264+ WorkResponse response = builder .build ();
265+ synchronized (this ) {
266+ messageProcessor .writeWorkResponse (response );
267+ }
209268 }
210269 gcScheduler .maybePerformGc ();
211270 }
0 commit comments