@@ -72,6 +72,8 @@ public class WorkerMultiplexer {
7272 * {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
7373 */
7474 private Subprocess process ;
75+ /** The implementation of the worker protocol (JSON or Proto). */
76+ private WorkerProtocolImpl workerProtocol ;
7577 /** InputStream from the worker process. */
7678 private RecordingInputStream recordingStream ;
7779 /** True if this multiplexer was explicitly destroyed. */
@@ -142,6 +144,18 @@ public synchronized void createProcess(Path workDir) throws IOException {
142144 processBuilder .setStderr (logFile .getPathFile ());
143145 processBuilder .setEnv (workerKey .getEnv ());
144146 this .process = processBuilder .start ();
147+ recordingStream = new RecordingInputStream (process .getInputStream ());
148+ recordingStream .startRecording (4096 );
149+ if (workerProtocol == null ) {
150+ switch (workerKey .getProtocolFormat ()) {
151+ case JSON :
152+ workerProtocol = new JsonWorkerProtocol (process .getOutputStream (), recordingStream );
153+ break ;
154+ case PROTO :
155+ workerProtocol = new ProtoWorkerProtocol (process .getOutputStream (), recordingStream );
156+ break ;
157+ }
158+ }
145159 String id = workerKey .getMnemonic () + "-" + workerKey .hashCode ();
146160 // TODO(larsrc): Consider moving sender/receiver threads into separate classes.
147161 this .requestSender =
@@ -277,8 +291,7 @@ private boolean sendRequest() {
277291 return false ;
278292 }
279293 try {
280- request .writeDelimitedTo (process .getOutputStream ());
281- process .getOutputStream ().flush ();
294+ workerProtocol .putRequest (request );
282295 } catch (IOException e ) {
283296 // We can't know how much of the request was sent, so we have to assume the worker's input
284297 // now contains garbage, and this request is lost.
@@ -303,11 +316,9 @@ private boolean sendRequest() {
303316 * execution cancellation, but only by a call to {@link #destroyProcess()}.
304317 */
305318 private boolean readResponse () {
306- recordingStream = new RecordingInputStream (process .getInputStream ());
307- recordingStream .startRecording (4096 );
308319 WorkResponse parsedResponse ;
309320 try {
310- parsedResponse = WorkResponse . parseDelimitedFrom ( recordingStream );
321+ parsedResponse = workerProtocol . getResponse ( );
311322 } catch (IOException e ) {
312323 if (!(e instanceof InterruptedIOException )) {
313324 report (
@@ -320,7 +331,8 @@ private boolean readResponse() {
320331 destroyProcess ();
321332 return false ;
322333 }
323- // A null parsedResponse can happen if the input stream is closed, in which case we
334+
335+ // A null parsedResponse can only happen if the input stream is closed, in which case we
324336 // drop everything.
325337 if (parsedResponse == null ) {
326338 report (
0 commit comments