Skip to content

Commit 1c264ba

Browse files
[MSHARED-1072] Poll data from input stream
Input stream can be a System.in - all read will be blocked We need read data in no blocking mode
1 parent 37fa530 commit 1c264ba

File tree

4 files changed

+116
-117
lines changed

4 files changed

+116
-117
lines changed

src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,13 @@ public void run() {
250250

251251
@Override
252252
public Integer call() throws CommandLineException {
253-
StreamFeeder inputFeeder = null;
253+
StreamPollFeeder inputFeeder = null;
254254
StreamPumper outputPumper = null;
255255
StreamPumper errorPumper = null;
256256
try {
257257
if (systemIn != null) {
258-
inputFeeder = new StreamFeeder(systemIn, p.getOutputStream());
259-
inputFeeder.setName("StreamFeeder-systemIn");
258+
inputFeeder = new StreamPollFeeder(systemIn, p.getOutputStream());
259+
inputFeeder.setName("StreamPollFeeder-systemIn");
260260
inputFeeder.start();
261261
}
262262

src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java

+37-23
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,64 @@
2424
import java.util.Objects;
2525

2626
/**
27-
* Read from an InputStream and write the output to an OutputStream.
27+
* Poll InputStream for available data and write the output to an OutputStream.
2828
*
2929
* @author <a href="mailto:[email protected]">Trygve Laugst&oslash;l</a>
3030
*/
31-
class StreamFeeder extends Thread {
31+
class StreamPollFeeder extends Thread {
3232

33-
private final InputStream input;
33+
public static final int BUF_LEN = 80;
3434

35+
private final InputStream input;
3536
private final OutputStream output;
3637

3738
private Throwable exception;
38-
private boolean done;
3939

40+
private boolean done;
4041
private final Object lock = new Object();
4142

4243
/**
43-
* Create a new StreamFeeder
44+
* Create a new StreamPollFeeder
4445
*
4546
* @param input Stream to read from
4647
* @param output Stream to write to
4748
*/
48-
StreamFeeder(InputStream input, OutputStream output) {
49+
StreamPollFeeder(InputStream input, OutputStream output) {
4950
this.input = Objects.requireNonNull(input);
5051
this.output = Objects.requireNonNull(output);
5152
this.done = false;
5253
}
5354

5455
@Override
55-
@SuppressWarnings("checkstyle:innerassignment")
5656
public void run() {
57+
58+
byte[] buf = new byte[BUF_LEN];
59+
5760
try {
58-
for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
59-
output.write(data);
61+
while (!done) {
62+
if (input.available() > 0) {
63+
int i = input.read(buf);
64+
if (i > 0) {
65+
output.write(buf, 0, i);
66+
output.flush();
67+
} else {
68+
done = true;
69+
}
70+
} else {
71+
synchronized (lock) {
72+
if (!done) {
73+
lock.wait(100);
74+
}
75+
}
76+
}
6077
}
61-
output.flush();
6278
} catch (IOException e) {
6379
exception = e;
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
6482
} finally {
6583
close();
6684
}
67-
68-
synchronized (lock) {
69-
done = true;
70-
lock.notifyAll();
71-
}
7285
}
7386

7487
private void close() {
@@ -89,15 +102,16 @@ public Throwable getException() {
89102
}
90103

91104
public void waitUntilDone() {
92-
this.interrupt();
105+
93106
synchronized (lock) {
94-
while (!done) {
95-
try {
96-
lock.wait();
97-
} catch (InterruptedException e) {
98-
Thread.currentThread().interrupt();
99-
}
100-
}
107+
done = true;
108+
lock.notifyAll();
109+
}
110+
111+
try {
112+
join();
113+
} catch (InterruptedException e) {
114+
Thread.currentThread().interrupt();
101115
}
102116
}
103117
}

src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java

-91
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.maven.shared.utils.cli;
20+
21+
import java.io.ByteArrayInputStream;
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
25+
import org.junit.Test;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.assertNull;
29+
30+
public class StreamPollFeederTest {
31+
32+
@Test
33+
public void waitUntilFeederDoneOnInputStream() throws Exception {
34+
35+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
36+
StreamPollFeeder streamPollFeeder = new StreamPollFeeder(System.in, outputStream);
37+
38+
// start thread
39+
streamPollFeeder.start();
40+
41+
// wait a moment
42+
Thread.sleep(100);
43+
44+
// wait until process finish
45+
streamPollFeeder.waitUntilDone();
46+
assertNull(streamPollFeeder.getException());
47+
}
48+
49+
@Test
50+
public void dataShouldBeCopied() throws InterruptedException, IOException {
51+
52+
StringBuilder TEST_DATA = new StringBuilder();
53+
for (int i = 0; i < 100; i++) {
54+
TEST_DATA.append("TestData");
55+
}
56+
57+
ByteArrayInputStream inputStream =
58+
new ByteArrayInputStream(TEST_DATA.toString().getBytes());
59+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
60+
61+
StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, outputStream);
62+
63+
streamPollFeeder.start();
64+
65+
// wait until all data from steam will be read
66+
while (outputStream.size() < TEST_DATA.length()) {
67+
Thread.sleep(100);
68+
}
69+
70+
// wait until process finish
71+
streamPollFeeder.waitUntilDone();
72+
assertNull(streamPollFeeder.getException());
73+
74+
assertEquals(TEST_DATA.toString(), outputStream.toString());
75+
}
76+
}

0 commit comments

Comments
 (0)