1313// limitations under the License.
1414package com .google .devtools .build .lib .worker ;
1515
16-
1716import com .google .common .annotations .VisibleForTesting ;
1817import com .google .devtools .build .lib .worker .WorkerProtocol .WorkRequest ;
1918import com .google .devtools .build .lib .worker .WorkerProtocol .WorkResponse ;
2423import java .io .StringWriter ;
2524import java .lang .management .ManagementFactory ;
2625import java .time .Duration ;
27- import java .util .ArrayDeque ;
2826import java .util .List ;
29- import java .util .Map ;
3027import java .util .Optional ;
31- import java .util .Queue ;
3228import java .util .concurrent .ConcurrentHashMap ;
29+ import java .util .concurrent .ConcurrentMap ;
3330import java .util .concurrent .atomic .AtomicReference ;
31+ import java .util .function .BiConsumer ;
3432import java .util .function .BiFunction ;
3533
3634/**
@@ -56,13 +54,31 @@ public interface WorkerMessageProcessor {
5654
5755 /** Holds information necessary to properly handle a request, especially for cancellation. */
5856 static class RequestInfo {
57+ /** The thread handling the request. */
58+ final Thread thread ;
59+ /** If true, we have received a cancel request for this request. */
60+ private boolean cancelled ;
5961 /**
6062 * The builder for the response to this request. Since only one response must be sent per
6163 * request, this builder must be accessed through takeBuilder(), which zeroes this field and
6264 * returns the builder.
6365 */
6466 private WorkResponse .Builder responseBuilder = WorkResponse .newBuilder ();
6567
68+ RequestInfo (Thread thread ) {
69+ this .thread = thread ;
70+ }
71+
72+ /** Sets whether this request has been cancelled. */
73+ void setCancelled () {
74+ cancelled = true ;
75+ }
76+
77+ /** Returns true if this request has been cancelled. */
78+ boolean isCancelled () {
79+ return cancelled ;
80+ }
81+
6682 /**
6783 * Returns the response builder. If called more than once on the same instance, subsequent calls
6884 * will return {@code null}.
@@ -72,13 +88,22 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {
7288 responseBuilder = null ;
7389 return Optional .ofNullable (b );
7490 }
91+
92+ /**
93+ * Adds {@code s} as output to when the response eventually gets built. Does nothing if the
94+ * response has already been taken. There is no guarantee that the response hasn't already been
95+ * taken, making this call a no-op. This may be called multiple times. No delimiters are added
96+ * between strings from multiple calls.
97+ */
98+ synchronized void addOutput (String s ) {
99+ if (responseBuilder != null ) {
100+ responseBuilder .setOutput (responseBuilder .getOutput () + s );
101+ }
102+ }
75103 }
76104
77105 /** Requests that are currently being processed. Visible for testing. */
78- final Map <Integer , RequestInfo > activeRequests = new ConcurrentHashMap <>();
79-
80- /** WorkRequests that have been received but could not be processed yet. */
81- private final Queue <WorkRequest > availableRequests = new ArrayDeque <>();
106+ final ConcurrentMap <Integer , RequestInfo > activeRequests = new ConcurrentHashMap <>();
82107
83108 /** The function to be called after each {@link WorkRequest} is read. */
84109 private final BiFunction <List <String >, PrintWriter , Integer > callback ;
@@ -88,6 +113,7 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {
88113
89114 final WorkerMessageProcessor messageProcessor ;
90115
116+ private final BiConsumer <Integer , Thread > cancelCallback ;
91117
92118 private final CpuTimeBasedGcScheduler gcScheduler ;
93119
@@ -107,7 +133,7 @@ public WorkRequestHandler(
107133 BiFunction <List <String >, PrintWriter , Integer > callback ,
108134 PrintStream stderr ,
109135 WorkerMessageProcessor messageProcessor ) {
110- this (callback , stderr , messageProcessor , Duration .ZERO );
136+ this (callback , stderr , messageProcessor , Duration .ZERO , null );
111137 }
112138
113139 /**
@@ -131,10 +157,24 @@ public WorkRequestHandler(
131157 PrintStream stderr ,
132158 WorkerMessageProcessor messageProcessor ,
133159 Duration cpuUsageBeforeGc ) {
160+ this (callback , stderr , messageProcessor , cpuUsageBeforeGc , null );
161+ }
162+
163+ /**
164+ * Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
165+ * received. Only used for the Builder.
166+ */
167+ private WorkRequestHandler (
168+ BiFunction <List <String >, PrintWriter , Integer > callback ,
169+ PrintStream stderr ,
170+ WorkerMessageProcessor messageProcessor ,
171+ Duration cpuUsageBeforeGc ,
172+ BiConsumer <Integer , Thread > cancelCallback ) {
134173 this .callback = callback ;
135174 this .stderr = stderr ;
136175 this .messageProcessor = messageProcessor ;
137176 this .gcScheduler = new CpuTimeBasedGcScheduler (cpuUsageBeforeGc );
177+ this .cancelCallback = cancelCallback ;
138178 }
139179
140180 /** Builder class for WorkRequestHandler. Required parameters are passed to the constructor. */
@@ -143,6 +183,7 @@ public static class WorkRequestHandlerBuilder {
143183 private final PrintStream stderr ;
144184 private final WorkerMessageProcessor messageProcessor ;
145185 private Duration cpuUsageBeforeGc = Duration .ZERO ;
186+ private BiConsumer <Integer , Thread > cancelCallback ;
146187
147188 /**
148189 * Creates a {@code WorkRequestHandlerBuilder}.
@@ -173,9 +214,19 @@ public WorkRequestHandlerBuilder setCpuUsageBeforeGc(Duration cpuUsageBeforeGc)
173214 return this ;
174215 }
175216
217+ /**
218+ * Sets a callback will be called when a cancellation message has been received. The callback
219+ * will be call with the request ID and the thread executing the request.
220+ */
221+ public WorkRequestHandlerBuilder setCancelCallback (BiConsumer <Integer , Thread > cancelCallback ) {
222+ this .cancelCallback = cancelCallback ;
223+ return this ;
224+ }
225+
176226 /** Returns a WorkRequestHandler instance with the values in this Builder. */
177227 public WorkRequestHandler build () {
178- return new WorkRequestHandler (callback , stderr , messageProcessor , cpuUsageBeforeGc );
228+ return new WorkRequestHandler (
229+ callback , stderr , messageProcessor , cpuUsageBeforeGc , cancelCallback );
179230 }
180231 }
181232
@@ -191,56 +242,42 @@ public void processRequests() throws IOException {
191242 if (request == null ) {
192243 break ;
193244 }
194- availableRequests .add (request );
195- startRequestThreads ();
196- }
197- }
198-
199- /**
200- * Starts threads for as many outstanding requests as possible. This is the only method that adds
201- * to {@code activeRequests}.
202- */
203- private synchronized void startRequestThreads () {
204- while (!availableRequests .isEmpty ()) {
205- // If there's a singleplex request in process, don't start more processes.
206- if (activeRequests .containsKey (0 )) {
207- return ;
245+ if (request .getCancel ()) {
246+ respondToCancelRequest (request );
247+ } else {
248+ startResponseThread (request );
208249 }
209- WorkRequest request = availableRequests .peek ();
210- // Don't start new singleplex requests if there are other requests running.
211- if (request .getRequestId () == 0 && !activeRequests .isEmpty ()) {
212- return ;
213- }
214- availableRequests .remove ();
215- Thread t = createResponseThread (request );
216- activeRequests .put (request .getRequestId (), new RequestInfo ());
217- t .start ();
218250 }
219251 }
220252
221- /** Creates a new {@link Thread} to process a multiplex request. */
222- Thread createResponseThread (WorkRequest request ) {
253+ /** Starts a thread for the given request. */
254+ void startResponseThread (WorkRequest request ) {
223255 Thread currentThread = Thread .currentThread ();
224256 String threadName =
225257 request .getRequestId () > 0
226258 ? "multiplex-request-" + request .getRequestId ()
227259 : "singleplex-request" ;
228- return new Thread (
229- () -> {
230- RequestInfo requestInfo = activeRequests .get (request .getRequestId ());
231- try {
232- respondToRequest (request , requestInfo );
233- } catch (IOException e ) {
234- e .printStackTrace (stderr );
235- // In case of error, shut down the entire worker.
236- currentThread .interrupt ();
237- } finally {
238- activeRequests .remove (request .getRequestId ());
239- // A good time to start more requests, especially if we finished a singleplex request
240- startRequestThreads ();
241- }
242- },
243- threadName );
260+ Thread t =
261+ new Thread (
262+ () -> {
263+ RequestInfo requestInfo = activeRequests .get (request .getRequestId ());
264+ if (requestInfo == null ) {
265+ // Already cancelled
266+ return ;
267+ }
268+ try {
269+ respondToRequest (request , requestInfo );
270+ } catch (IOException e ) {
271+ e .printStackTrace (stderr );
272+ // In case of error, shut down the entire worker.
273+ currentThread .interrupt ();
274+ } finally {
275+ activeRequests .remove (request .getRequestId ());
276+ }
277+ },
278+ threadName );
279+ activeRequests .put (request .getRequestId (), new RequestInfo (t ));
280+ t .start ();
244281 }
245282
246283 /** Handles and responds to the given {@link WorkRequest}. */
@@ -260,7 +297,11 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
260297 if (optBuilder .isPresent ()) {
261298 WorkResponse .Builder builder = optBuilder .get ();
262299 builder .setRequestId (request .getRequestId ());
263- builder .setOutput (builder .getOutput () + sw .toString ()).setExitCode (exitCode );
300+ if (requestInfo .isCancelled ()) {
301+ builder .setWasCancelled (true );
302+ } else {
303+ builder .setOutput (builder .getOutput () + sw ).setExitCode (exitCode );
304+ }
264305 WorkResponse response = builder .build ();
265306 synchronized (this ) {
266307 messageProcessor .writeWorkResponse (response );
@@ -270,6 +311,45 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
270311 }
271312 }
272313
314+ /**
315+ * Handles cancelling an existing request, including sending a response if that is not done by the
316+ * time {@code cancelCallback.accept} returns.
317+ */
318+ void respondToCancelRequest (WorkRequest request ) throws IOException {
319+ // Theoretically, we could have gotten two singleplex requests, and we can't tell those apart.
320+ // However, that's a violation of the protocol, so we don't try to handle it (not least because
321+ // handling it would be quite error-prone).
322+ RequestInfo ri = activeRequests .remove (request .getRequestId ());
323+
324+ if (ri == null ) {
325+ return ;
326+ }
327+ if (cancelCallback == null ) {
328+ ri .setCancelled ();
329+ // This is either an error on the server side or a version mismatch between the server setup
330+ // and the binary. It's better to wait for the regular work to finish instead of breaking the
331+ // build, but we should inform the user about the bad setup.
332+ ri .addOutput (
333+ String .format (
334+ "Cancellation request received for worker request %d, but this worker does not"
335+ + " support cancellation.\n " ,
336+ request .getRequestId ()));
337+ } else {
338+ if (ri .thread .isAlive () && !ri .isCancelled ()) {
339+ ri .setCancelled ();
340+ cancelCallback .accept (request .getRequestId (), ri .thread );
341+ Optional <WorkResponse .Builder > builder = ri .takeBuilder ();
342+ if (builder .isPresent ()) {
343+ WorkResponse response =
344+ builder .get ().setWasCancelled (true ).setRequestId (request .getRequestId ()).build ();
345+ synchronized (this ) {
346+ messageProcessor .writeWorkResponse (response );
347+ }
348+ }
349+ }
350+ }
351+ }
352+
273353 @ Override
274354 public void close () throws IOException {
275355 messageProcessor .close ();
0 commit comments