Skip to content

Commit 2eae38e

Browse files
author
Beria
committed
Make AppendOutputRunner non-static
1 parent 72c316d commit 2eae38e

File tree

4 files changed

+61
-78
lines changed

4 files changed

+61
-78
lines changed

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,17 @@ public class AppendOutputRunner implements Runnable {
3737

3838
private static final Logger logger =
3939
LoggerFactory.getLogger(AppendOutputRunner.class);
40-
private static RemoteInterpreterProcessListener listener;
41-
4240
public static final Long BUFFER_TIME_MS = new Long(100);
4341
private static final Long SAFE_PROCESSING_TIME = new Long(10);
4442
private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
4543

46-
private static final BlockingQueue<AppendOutputBuffer> QUEUE =
44+
private final BlockingQueue<AppendOutputBuffer> queue =
4745
new LinkedBlockingQueue<AppendOutputBuffer>();
46+
private final RemoteInterpreterProcessListener listener;
47+
48+
public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
49+
this.listener = listener;
50+
}
4851

4952
@Override
5053
public void run() {
@@ -62,12 +65,12 @@ public void run() {
6265
* cpu-cycles.
6366
*/
6467
try {
65-
list.add(QUEUE.take());
68+
list.add(queue.take());
6669
} catch (InterruptedException e) {
6770
logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
6871
}
6972
Long processingStartTime = System.currentTimeMillis();
70-
QUEUE.drainTo(list);
73+
queue.drainTo(list);
7174

7275
for (AppendOutputBuffer buffer: list) {
7376
String noteId = buffer.getNoteId();
@@ -110,17 +113,8 @@ public void run() {
110113
}
111114
}
112115

113-
public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
114-
QUEUE.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
116+
public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
117+
queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
115118
}
116119

117-
public static void setListener(RemoteInterpreterProcessListener listener) {
118-
AppendOutputRunner.listener = listener;
119-
}
120-
121-
/* This function is only used by unit-tests*/
122-
public static void emptyQueueForUnitTests() {
123-
List<AppendOutputBuffer> list = new LinkedList<AppendOutputBuffer>();
124-
QUEUE.drainTo(list);
125-
}
126120
}

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,39 +25,24 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28-
/** This class is responsible for initializing
29-
* and ensuring that AppendOutputRunner is up
30-
* and running.
28+
/** This class periodically calls AppendOutputRunner
29+
* to send append-output events.
3130
*/
3231
public class CheckAppendOutputRunner {
3332

3433
private static final Logger logger =
3534
LoggerFactory.getLogger(CheckAppendOutputRunner.class);
36-
private static final Boolean SYNCHRONIZER = false;
37-
private static ScheduledExecutorService SCHEDULED_SERVICE = null;
35+
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
3836
private static ScheduledFuture<?> futureObject = null;
39-
private static AppendOutputRunner runner = null;
4037

41-
public static void startScheduler() {
42-
synchronized (SYNCHRONIZER) {
43-
if (SCHEDULED_SERVICE == null) {
44-
runner = new AppendOutputRunner();
45-
logger.info("Starting a AppendOutputRunner thread to buffer"
46-
+ " and send paragraph append data.");
47-
SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor();
48-
futureObject = SCHEDULED_SERVICE.scheduleWithFixedDelay(
49-
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
50-
}
38+
public synchronized static void startScheduler(
39+
RemoteInterpreterProcessListener listener, AppendOutputRunner runner) {
40+
if (futureObject != null) {
41+
futureObject.cancel(true);
5142
}
52-
}
53-
54-
/* This function is only used by unit-tests. */
55-
public static void stopSchedulerForUnitTests() {
56-
synchronized (SYNCHRONIZER) {
57-
if (futureObject != null) {
58-
futureObject.cancel(true);
59-
}
60-
SCHEDULED_SERVICE = null;
43+
logger.info("Starting a AppendOutputRunner thread to buffer"
44+
+ " and send paragraph append data.");
45+
futureObject = service.scheduleWithFixedDelay(
46+
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
6147
}
62-
}
6348
}

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
7272
@Override
7373
public void run() {
7474
Client client = null;
75-
AppendOutputRunner.setListener(listener);
76-
CheckAppendOutputRunner.startScheduler();
75+
AppendOutputRunner runner = new AppendOutputRunner(listener);
76+
CheckAppendOutputRunner.startScheduler(listener, runner);
7777

7878
while (!shutdown) {
7979
// wait and retry
@@ -159,7 +159,7 @@ public void run() {
159159
String appId = outputAppend.get("appId");
160160

161161
if (appId == null) {
162-
AppendOutputRunner.appendBuffer(noteId, paragraphId, outputToAppend);
162+
runner.appendBuffer(noteId, paragraphId, outputToAppend);
163163
} else {
164164
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
165165
}

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.log4j.Level;
3434
import org.apache.log4j.Logger;
3535
import org.apache.log4j.spi.LoggingEvent;
36-
import org.junit.BeforeClass;
3736
import org.junit.Test;
3837
import org.mockito.invocation.InvocationOnMock;
3938
import org.mockito.stubbing.Answer;
@@ -48,36 +47,30 @@ public class AppendOutputRunnerTest {
4847
*/
4948
private volatile static int numInvocations = 0;
5049

51-
@BeforeClass
52-
public static void beforeClass() {
53-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
54-
AppendOutputRunner.emptyQueueForUnitTests();
55-
}
56-
5750
@Test
5851
public void testSingleEvent() throws InterruptedException {
5952
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
60-
AppendOutputRunner.appendBuffer("note", "para", "data\n");
53+
String[][] buffer = {{"note", "para", "data\n"}};
6154

62-
loopForCompletingEvents(listener, 1);
55+
loopForCompletingEvents(listener, 1, buffer);
6356
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
6457
verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
65-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
6658
}
6759

6860
@Test
6961
public void testMultipleEventsOfSameParagraph() throws InterruptedException {
7062
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
7163
String note1 = "note1";
7264
String para1 = "para1";
73-
AppendOutputRunner.appendBuffer(note1, para1, "data1\n");
74-
AppendOutputRunner.appendBuffer(note1, para1, "data2\n");
75-
AppendOutputRunner.appendBuffer(note1, para1, "data3\n");
65+
String[][] buffer = {
66+
{note1, para1, "data1\n"},
67+
{note1, para1, "data2\n"},
68+
{note1, para1, "data3\n"}
69+
};
7670

77-
loopForCompletingEvents(listener, 1);
71+
loopForCompletingEvents(listener, 1, buffer);
7872
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
7973
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
80-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
8174
}
8275

8376
@Test
@@ -87,26 +80,27 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio
8780
String note2 = "note2";
8881
String para1 = "para1";
8982
String para2 = "para2";
90-
AppendOutputRunner.appendBuffer(note1, para1, "data1\n");
91-
AppendOutputRunner.appendBuffer(note1, para2, "data2\n");
92-
AppendOutputRunner.appendBuffer(note2, para1, "data3\n");
93-
AppendOutputRunner.appendBuffer(note2, para2, "data4\n");
94-
loopForCompletingEvents(listener, 4);
83+
String[][] buffer = {
84+
{note1, para1, "data1\n"},
85+
{note1, para2, "data2\n"},
86+
{note2, para1, "data3\n"},
87+
{note2, para2, "data4\n"}
88+
};
89+
loopForCompletingEvents(listener, 4, buffer);
9590

9691
verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class));
9792
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n");
9893
verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
9994
verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
10095
verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
101-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
10296
}
10397

10498
@Test
10599
public void testClubbedData() throws InterruptedException {
106100
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
107-
AppendOutputRunner.setListener(listener);
108-
CheckAppendOutputRunner.startScheduler();
109-
Thread thread = new Thread(new BombardEvents());
101+
AppendOutputRunner runner = new AppendOutputRunner(listener);
102+
CheckAppendOutputRunner.startScheduler(listener, runner);
103+
Thread thread = new Thread(new BombardEvents(runner));
110104
thread.start();
111105
thread.join();
112106
Thread.sleep(1000);
@@ -117,24 +111,25 @@ public void testClubbedData() throws InterruptedException {
117111
* the unit-test to a pessimistic 100 web-socket calls.
118112
*/
119113
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
120-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
121114
}
122115

123116
@Test
124117
public void testWarnLoggerForLargeData() throws InterruptedException {
118+
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
119+
AppendOutputRunner runner = new AppendOutputRunner(listener);
125120
String data = "data\n";
126121
int numEvents = 100000;
122+
127123
for (int i=0; i<numEvents; i++) {
128-
AppendOutputRunner.appendBuffer("noteId", "paraId", data);
124+
runner.appendBuffer("noteId", "paraId", data);
129125
}
130126

131127
TestAppender appender = new TestAppender();
132128
Logger logger = Logger.getRootLogger();
133129
logger.addAppender(appender);
134130
Logger.getLogger(RemoteInterpreterEventPoller.class);
135131

136-
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
137-
loopForCompletingEvents(listener, 1);
132+
runner.run();
138133
List<LoggingEvent> log;
139134

140135
int warnLogCounter;
@@ -153,17 +148,22 @@ public void testWarnLoggerForLargeData() throws InterruptedException {
153148
String loggerString = "Processing size for buffered append-output is high: " +
154149
(data.length() * numEvents) + " characters.";
155150
assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
156-
CheckAppendOutputRunner.stopSchedulerForUnitTests();
157151
}
158152

159153
private class BombardEvents implements Runnable {
160154

155+
private final AppendOutputRunner runner;
156+
157+
private BombardEvents(AppendOutputRunner runner) {
158+
this.runner = runner;
159+
}
160+
161161
@Override
162162
public void run() {
163163
String noteId = "noteId";
164164
String paraId = "paraId";
165165
for (int i=0; i<NUM_EVENTS; i++) {
166-
AppendOutputRunner.appendBuffer(noteId, paraId, "data\n");
166+
runner.appendBuffer(noteId, paraId, "data\n");
167167
}
168168
}
169169
}
@@ -200,11 +200,15 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
200200
}).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class));
201201
}
202202

203-
private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, int numTimes) {
203+
private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
204+
int numTimes, String[][] buffer) {
204205
numInvocations = 0;
205206
prepareInvocationCounts(listener);
206-
AppendOutputRunner.setListener(listener);
207-
CheckAppendOutputRunner.startScheduler();
207+
AppendOutputRunner runner = new AppendOutputRunner(listener);
208+
for (String[] bufferElement: buffer) {
209+
runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]);
210+
}
211+
CheckAppendOutputRunner.startScheduler(listener, runner);
208212
long startTimeMs = System.currentTimeMillis();
209213
while(numInvocations != numTimes) {
210214
if (System.currentTimeMillis() - startTimeMs > 2000) {

0 commit comments

Comments
 (0)