Skip to content

Commit 479b836

Browse files
committed
Add test
1 parent c01df62 commit 479b836

File tree

7 files changed

+334
-40
lines changed

7 files changed

+334
-40
lines changed

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class InterpreterOutput extends OutputStream {
3838
private final List<Object> outList = new LinkedList<Object>();
3939
private InterpreterOutputChangeWatcher watcher;
4040
private final InterpreterOutputListener flushListener;
41+
private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
42+
private boolean firstWrite = true;
4143

4244
public InterpreterOutput(InterpreterOutputListener flushListener) {
4345
this.flushListener = flushListener;
@@ -52,33 +54,77 @@ public InterpreterOutput(InterpreterOutputListener flushListener,
5254
watcher.start();
5355
}
5456

57+
public InterpreterResult.Type getType() {
58+
return type;
59+
}
60+
61+
public void setType(InterpreterResult.Type type) {
62+
if (this.type != type) {
63+
clear();
64+
flushListener.onUpdate(this, new byte[]{});
65+
this.type = type;
66+
}
67+
}
68+
5569
public void clear() {
5670
synchronized (outList) {
71+
type = InterpreterResult.Type.TEXT;
5772
buffer.reset();
5873
outList.clear();
5974
if (watcher != null) {
6075
watcher.clear();
6176
}
6277
}
6378
}
79+
6480
@Override
6581
public void write(int b) throws IOException {
6682
synchronized (outList) {
6783
buffer.write(b);
6884
if (b == NEW_LINE_CHAR) {
69-
if (outList.isEmpty()) {
85+
// first time use of this outputstream.
86+
if (firstWrite) {
87+
// clear the output on gui
7088
flushListener.onUpdate(this, new byte[]{});
89+
firstWrite = false;
7190
}
7291

7392
buffer.flush();
7493
byte[] byteArray = buffer.toByteArray();
75-
outList.add(byteArray);
76-
flushListener.onAppend(this, byteArray);
94+
95+
96+
// check output type directive
97+
byteArray = detectTypeFromLine(byteArray);
98+
99+
if (byteArray != null) {
100+
outList.add(byteArray);
101+
102+
if (type == InterpreterResult.Type.TEXT) {
103+
flushListener.onAppend(this, byteArray);
104+
}
105+
}
77106
buffer.reset();
78107
}
79108
}
80109
}
81110

111+
private byte [] detectTypeFromLine(byte [] byteArray) {
112+
// check output type directive
113+
String line = new String(byteArray);
114+
for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
115+
String typeString = '%' + t.name().toLowerCase();
116+
if ((typeString + "\n").equals(line)) {
117+
setType(t);
118+
byteArray = null;
119+
} else if (line.startsWith(typeString + " ")) {
120+
setType(t);
121+
byteArray = line.substring(typeString.length() + 1).getBytes();
122+
}
123+
}
124+
125+
return byteArray;
126+
}
127+
82128
@Override
83129
public void write(byte [] b) throws IOException {
84130
write(b, 0, b.length);
@@ -204,8 +250,13 @@ public void close() throws IOException {
204250
synchronized (outList) {
205251
buffer.flush();
206252
byte[] bytes = buffer.toByteArray();
207-
outList.add(bytes);
208-
flushListener.onAppend(this, bytes);
253+
bytes = detectTypeFromLine(bytes);
254+
if (bytes != null) {
255+
outList.add(bytes);
256+
if (type == InterpreterResult.Type.TEXT) {
257+
flushListener.onAppend(this, bytes);
258+
}
259+
}
209260
buffer.close();
210261
}
211262

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -294,24 +294,21 @@ protected Object jobRun() throws Throwable {
294294
InterpreterContext.set(context);
295295
InterpreterResult result = interpreter.interpret(script, context);
296296

297-
// prepend context.out to result for now
298-
// later, context.out should be streamed to the front-end
299-
String output = null;
297+
// data from context.out is prepended to InterpreterResult if both defined
298+
String message = "";
300299

301300
byte[] interpreterOutput = context.out.toByteArray(true);
302-
if (interpreterOutput != null) {
303-
output = new String(interpreterOutput);
301+
if (interpreterOutput != null && interpreterOutput.length > 0) {
302+
message = new String(interpreterOutput);
304303
}
305304

306305
if (result.message() != null) {
307-
if (output == null || output.length() == 0) {
308-
output = result.toString();
309-
} else {
310-
output += result.message();
311-
}
306+
message += result.message();
307+
return new InterpreterResult(result.code(), message);
308+
} else {
309+
return new InterpreterResult(result.code(), context.out
310+
.getType(), message);
312311
}
313-
314-
return new InterpreterResult(result.code(), output);
315312
} finally {
316313
InterpreterContext.remove();
317314
}

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727

2828
public class InterpreterOutputTest implements InterpreterOutputListener {
2929
private InterpreterOutput out;
30-
int numNewLineDetected;
30+
int numAppendEvent;
31+
int numUpdateEvent;
3132

3233
@Before
3334
public void setUp() {
3435
out = new InterpreterOutput(this);
35-
numNewLineDetected = 0;
36+
numAppendEvent = 0;
37+
numUpdateEvent = 0;
3638
}
3739

3840
@After
@@ -43,32 +45,54 @@ public void tearDown() throws IOException {
4345
@Test
4446
public void testDetectNewline() throws IOException {
4547
out.write("hello\nworld");
46-
assertEquals("hello\n", new String(out.toByteArray()));
47-
assertEquals(1, numNewLineDetected);
48+
assertEquals("hello\nworld", new String(out.toByteArray()));
49+
assertEquals(1, numAppendEvent);
50+
assertEquals(1, numUpdateEvent);
4851

4952
out.write("\n");
5053
assertEquals("hello\nworld\n", new String(out.toByteArray()));
51-
assertEquals(2, numNewLineDetected);
54+
assertEquals(2, numAppendEvent);
55+
assertEquals(1, numUpdateEvent);
5256
}
5357

5458
@Test
55-
public void testFlushInternalBufferOnClose() throws IOException {
56-
out.write("hello\nworld");
57-
assertEquals("hello\n", new String(out.toByteArray()));
58-
assertEquals(1, numNewLineDetected);
59+
public void testType() throws IOException {
60+
// default output stream type is TEXT
61+
out.write("Text\n");
62+
assertEquals(InterpreterResult.Type.TEXT, out.getType());
63+
assertEquals("Text\n", new String(out.toByteArray()));
64+
assertEquals(1, numAppendEvent);
65+
assertEquals(1, numUpdateEvent);
5966

60-
out.close();
61-
assertEquals("hello\nworld", new String(out.toByteArray()));
62-
assertEquals(2, numNewLineDetected);
67+
// change type
68+
out.write("%html\n");
69+
assertEquals(InterpreterResult.Type.HTML, out.getType());
70+
assertEquals("", new String(out.toByteArray()));
71+
assertEquals(1, numAppendEvent);
72+
assertEquals(2, numUpdateEvent);
73+
74+
// none TEXT type output stream does not generate append event
75+
out.write("<div>html</div>\n");
76+
assertEquals(InterpreterResult.Type.HTML, out.getType());
77+
assertEquals(1, numAppendEvent);
78+
assertEquals(2, numUpdateEvent);
79+
assertEquals("<div>html</div>\n", new String(out.toByteArray()));
80+
81+
// change type to text again
82+
out.write("%text hello\n");
83+
assertEquals(InterpreterResult.Type.TEXT, out.getType());
84+
assertEquals(2, numAppendEvent);
85+
assertEquals(3, numUpdateEvent);
86+
assertEquals("hello\n", new String(out.toByteArray()));
6387
}
6488

6589
@Override
6690
public void onAppend(InterpreterOutput out, byte[] line) {
67-
numNewLineDetected++;
91+
numAppendEvent++;
6892
}
6993

7094
@Override
7195
public void onUpdate(InterpreterOutput out, byte[] output) {
72-
96+
numUpdateEvent++;
7397
}
7498
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.interpreter.remote;
19+
20+
import org.apache.zeppelin.display.AngularObjectRegistry;
21+
import org.apache.zeppelin.display.GUI;
22+
import org.apache.zeppelin.interpreter.*;
23+
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
24+
import org.junit.After;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
28+
import java.io.File;
29+
import java.util.HashMap;
30+
import java.util.LinkedList;
31+
import java.util.Properties;
32+
33+
import static org.junit.Assert.assertEquals;
34+
35+
36+
/**
37+
* Test for remote interpreter output stream
38+
*/
39+
public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
40+
private InterpreterGroup intpGroup;
41+
private HashMap<String, String> env;
42+
43+
@Before
44+
public void setUp() throws Exception {
45+
intpGroup = new InterpreterGroup();
46+
env = new HashMap<String, String>();
47+
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
48+
}
49+
50+
@After
51+
public void tearDown() throws Exception {
52+
intpGroup.close();
53+
intpGroup.destroy();
54+
}
55+
56+
private RemoteInterpreter createMockInterpreter() {
57+
RemoteInterpreter intp = new RemoteInterpreter(
58+
new Properties(),
59+
MockInterpreterOutputStream.class.getName(),
60+
new File("../bin/interpreter.sh").getAbsolutePath(),
61+
"fake",
62+
env,
63+
10 * 1000,
64+
this);
65+
66+
67+
intpGroup.add(intp);
68+
intp.setInterpreterGroup(intpGroup);
69+
return intp;
70+
}
71+
72+
private InterpreterContext createInterpreterContext() {
73+
return new InterpreterContext(
74+
"noteId",
75+
"id",
76+
"title",
77+
"text",
78+
new HashMap<String, Object>(),
79+
new GUI(),
80+
new AngularObjectRegistry(intpGroup.getId(), null),
81+
new LinkedList<InterpreterContextRunner>(), null);
82+
}
83+
84+
@Test
85+
public void testInterpreterResultOnly() {
86+
RemoteInterpreter intp = createMockInterpreter();
87+
InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
88+
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
89+
assertEquals("staticresult", ret.message());
90+
91+
ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
92+
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
93+
assertEquals("staticresult2", ret.message());
94+
95+
ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
96+
assertEquals(InterpreterResult.Code.ERROR, ret.code());
97+
assertEquals("staticresult3", ret.message());
98+
}
99+
100+
@Test
101+
public void testInterpreterOutputStreamOnly() {
102+
RemoteInterpreter intp = createMockInterpreter();
103+
InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
104+
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
105+
assertEquals("streamresult", ret.message());
106+
107+
ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
108+
assertEquals(InterpreterResult.Code.ERROR, ret.code());
109+
assertEquals("streamresult2", ret.message());
110+
}
111+
112+
@Test
113+
public void testInterpreterResultOutputStreamMixed() {
114+
RemoteInterpreter intp = createMockInterpreter();
115+
InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
116+
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
117+
assertEquals("streamstatic", ret.message());
118+
}
119+
@Override
120+
public void onOutputAppend(String noteId, String paragraphId, String output) {
121+
122+
}
123+
124+
@Override
125+
public void onOutputUpdated(String noteId, String paragraphId, String output) {
126+
127+
}
128+
}

0 commit comments

Comments
 (0)