Skip to content

Commit 72c316d

Browse files
author
Beria
committed
Scheduler service to replace while loop in AppendOutputRunner
1 parent 599281f commit 72c316d

File tree

3 files changed

+81
-119
lines changed

3 files changed

+81
-119
lines changed

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java

Lines changed: 53 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class AppendOutputRunner implements Runnable {
3939
LoggerFactory.getLogger(AppendOutputRunner.class);
4040
private static RemoteInterpreterProcessListener listener;
4141

42-
private static final Long BUFFER_TIME_MS = new Long(100);
42+
public static final Long BUFFER_TIME_MS = new Long(100);
4343
private static final Long SAFE_PROCESSING_TIME = new Long(10);
4444
private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
4545

@@ -49,74 +49,65 @@ public class AppendOutputRunner implements Runnable {
4949
@Override
5050
public void run() {
5151

52-
while (true) {
53-
Map<String, Map<String, StringBuilder> > noteMap =
54-
new HashMap<String, Map<String, StringBuilder> >();
55-
List<AppendOutputBuffer> list = new LinkedList<AppendOutputBuffer>();
56-
57-
/* "drainTo" method does not wait for any element
58-
* to be present in the queue, and thus this loop would
59-
* continuosly run (with period of BUFFER_TIME_MS). "take()" method
60-
* waits for the queue to become non-empty and then removes
61-
* one element from it. Rest elements from queue (if present) are
62-
* removed using "drainTo" method. Thus we save on some un-necessary
63-
* cpu-cycles.
64-
*/
65-
try {
66-
list.add(QUEUE.take());
67-
} catch (InterruptedException e) {
68-
logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
69-
break;
70-
}
71-
Long processingStartTime = System.currentTimeMillis();
72-
QUEUE.drainTo(list);
52+
Map<String, Map<String, StringBuilder> > noteMap =
53+
new HashMap<String, Map<String, StringBuilder> >();
54+
List<AppendOutputBuffer> list = new LinkedList<AppendOutputBuffer>();
7355

74-
for (AppendOutputBuffer buffer: list) {
75-
String noteId = buffer.getNoteId();
76-
String paragraphId = buffer.getParagraphId();
56+
/* "drainTo" method does not wait for any element
57+
* to be present in the queue, and thus this loop would
58+
* continuosly run (with period of BUFFER_TIME_MS). "take()" method
59+
* waits for the queue to become non-empty and then removes
60+
* one element from it. Rest elements from queue (if present) are
61+
* removed using "drainTo" method. Thus we save on some un-necessary
62+
* cpu-cycles.
63+
*/
64+
try {
65+
list.add(QUEUE.take());
66+
} catch (InterruptedException e) {
67+
logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
68+
}
69+
Long processingStartTime = System.currentTimeMillis();
70+
QUEUE.drainTo(list);
7771

78-
Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
79-
noteMap.get(noteId) : new HashMap<String, StringBuilder>();
80-
StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
81-
paragraphMap.get(paragraphId) : new StringBuilder();
72+
for (AppendOutputBuffer buffer: list) {
73+
String noteId = buffer.getNoteId();
74+
String paragraphId = buffer.getParagraphId();
8275

83-
builder.append(buffer.getData());
84-
paragraphMap.put(paragraphId, builder);
85-
noteMap.put(noteId, paragraphMap);
86-
}
87-
Long processingTime = System.currentTimeMillis() - processingStartTime;
88-
89-
if (processingTime > SAFE_PROCESSING_TIME) {
90-
logger.warn("Processing time for buffered append-output is high: " +
91-
processingTime + " milliseconds.");
92-
} else {
93-
logger.debug("Processing time for append-output took "
94-
+ processingTime + " milliseconds");
95-
}
76+
Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
77+
noteMap.get(noteId) : new HashMap<String, StringBuilder>();
78+
StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
79+
paragraphMap.get(paragraphId) : new StringBuilder();
9680

97-
Long sizeProcessed = new Long(0);
98-
for (String noteId: noteMap.keySet()) {
99-
for (String paragraphId: noteMap.get(noteId).keySet()) {
100-
String data = noteMap.get(noteId).get(paragraphId).toString();
101-
sizeProcessed += data.length();
102-
listener.onOutputAppend(noteId, paragraphId, data);
103-
}
104-
}
81+
builder.append(buffer.getData());
82+
paragraphMap.put(paragraphId, builder);
83+
noteMap.put(noteId, paragraphMap);
84+
}
85+
Long processingTime = System.currentTimeMillis() - processingStartTime;
86+
87+
if (processingTime > SAFE_PROCESSING_TIME) {
88+
logger.warn("Processing time for buffered append-output is high: " +
89+
processingTime + " milliseconds.");
90+
} else {
91+
logger.debug("Processing time for append-output took "
92+
+ processingTime + " milliseconds");
93+
}
10594

106-
if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
107-
logger.warn("Processing size for buffered append-output is high: " +
108-
sizeProcessed + " characters.");
109-
} else {
110-
logger.debug("Processing size for append-output is " +
111-
sizeProcessed + " characters");
112-
}
113-
try {
114-
Thread.sleep(BUFFER_TIME_MS);
115-
} catch (InterruptedException e) {
116-
logger.error("Append output thread interrupted: " + e.getMessage());
117-
break;
95+
Long sizeProcessed = new Long(0);
96+
for (String noteId: noteMap.keySet()) {
97+
for (String paragraphId: noteMap.get(noteId).keySet()) {
98+
String data = noteMap.get(noteId).get(paragraphId).toString();
99+
sizeProcessed += data.length();
100+
listener.onOutputAppend(noteId, paragraphId, data);
118101
}
119102
}
103+
104+
if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
105+
logger.warn("Processing size for buffered append-output is high: " +
106+
sizeProcessed + " characters.");
107+
} else {
108+
logger.debug("Processing size for append-output is " +
109+
sizeProcessed + " characters");
110+
}
120111
}
121112

122113
public static void appendBuffer(String noteId, String paragraphId, String outputToAppend) {

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/CheckAppendOutputRunner.java

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,62 +29,33 @@
2929
* and ensuring that AppendOutputRunner is up
3030
* and running.
3131
*/
32-
public class CheckAppendOutputRunner implements Runnable {
32+
public class CheckAppendOutputRunner {
3333

3434
private static final Logger logger =
3535
LoggerFactory.getLogger(CheckAppendOutputRunner.class);
36-
private static Thread thread = null;
3736
private static final Boolean SYNCHRONIZER = false;
3837
private static ScheduledExecutorService SCHEDULED_SERVICE = null;
3938
private static ScheduledFuture<?> futureObject = null;
40-
41-
/* Can only be initialized locally.*/
42-
private CheckAppendOutputRunner()
43-
{}
44-
45-
@Override
46-
public void run() {
47-
synchronized (SYNCHRONIZER) {
48-
if (thread == null || !thread.isAlive()) {
49-
logger.info("Starting a AppendOutputRunner thread to buffer"
50-
+ " and send paragraph append data.");
51-
thread = new Thread(new AppendOutputRunner());
52-
thread.start();
53-
}
54-
}
55-
}
39+
private static AppendOutputRunner runner = null;
5640

5741
public static void startScheduler() {
5842
synchronized (SYNCHRONIZER) {
5943
if (SCHEDULED_SERVICE == null) {
44+
runner = new AppendOutputRunner();
45+
logger.info("Starting a AppendOutputRunner thread to buffer"
46+
+ " and send paragraph append data.");
6047
SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor();
6148
futureObject = SCHEDULED_SERVICE.scheduleWithFixedDelay(
62-
new CheckAppendOutputRunner(), 0, 1, TimeUnit.SECONDS);
49+
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
6350
}
6451
}
6552
}
6653

67-
/* These functions are only used by unit-tests. */
68-
public static void stopRunnerForUnitTests() {
69-
synchronized (SYNCHRONIZER) {
70-
thread.interrupt();
71-
}
72-
}
73-
74-
public static void startRunnerForUnitTests() {
75-
synchronized (SYNCHRONIZER) {
76-
thread = new Thread(new AppendOutputRunner());
77-
thread.start();
78-
}
79-
}
80-
81-
public static void stopSchedulerAndRunnerForUnitTests() {
54+
/* This function is only used by unit-tests. */
55+
public static void stopSchedulerForUnitTests() {
8256
synchronized (SYNCHRONIZER) {
8357
if (futureObject != null) {
84-
futureObject.cancel(false);
85-
}
86-
if (thread != null && thread.isAlive()) {
87-
thread.interrupt();
58+
futureObject.cancel(true);
8859
}
8960
SCHEDULED_SERVICE = null;
9061
}

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@ public class AppendOutputRunnerTest {
5050

5151
@BeforeClass
5252
public static void beforeClass() {
53-
CheckAppendOutputRunner.stopSchedulerAndRunnerForUnitTests();
53+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
5454
AppendOutputRunner.emptyQueueForUnitTests();
55-
try {
56-
Thread.sleep(1000);
57-
} catch (InterruptedException e)
58-
{}
5955
}
6056

6157
@Test
@@ -66,7 +62,7 @@ public void testSingleEvent() throws InterruptedException {
6662
loopForCompletingEvents(listener, 1);
6763
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
6864
verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
69-
CheckAppendOutputRunner.stopRunnerForUnitTests();
65+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
7066
}
7167

7268
@Test
@@ -81,7 +77,7 @@ public void testMultipleEventsOfSameParagraph() throws InterruptedException {
8177
loopForCompletingEvents(listener, 1);
8278
verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
8379
verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
84-
CheckAppendOutputRunner.stopRunnerForUnitTests();
80+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
8581
}
8682

8783
@Test
@@ -102,14 +98,14 @@ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedExceptio
10298
verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
10399
verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
104100
verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
105-
CheckAppendOutputRunner.stopRunnerForUnitTests();
101+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
106102
}
107103

108104
@Test
109105
public void testClubbedData() throws InterruptedException {
110106
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
111107
AppendOutputRunner.setListener(listener);
112-
CheckAppendOutputRunner.startRunnerForUnitTests();
108+
CheckAppendOutputRunner.startScheduler();
113109
Thread thread = new Thread(new BombardEvents());
114110
thread.start();
115111
thread.join();
@@ -121,7 +117,7 @@ public void testClubbedData() throws InterruptedException {
121117
* the unit-test to a pessimistic 100 web-socket calls.
122118
*/
123119
verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
124-
CheckAppendOutputRunner.stopRunnerForUnitTests();
120+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
125121
}
126122

127123
@Test
@@ -140,20 +136,24 @@ public void testWarnLoggerForLargeData() throws InterruptedException {
140136
RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
141137
loopForCompletingEvents(listener, 1);
142138
List<LoggingEvent> log;
139+
140+
int warnLogCounter;
141+
LoggingEvent sizeWarnLogEntry = null;
143142
do {
143+
warnLogCounter = 0;
144144
log = appender.getLog();
145-
} while(log.size() != 2);
146-
LoggingEvent sizeWarnLogEntry = null;
147-
148-
for (LoggingEvent logEntry: log) {
149-
if (Level.WARN.equals(logEntry.getLevel())) {
150-
sizeWarnLogEntry = logEntry;
145+
for (LoggingEvent logEntry: log) {
146+
if (Level.WARN.equals(logEntry.getLevel())) {
147+
sizeWarnLogEntry = logEntry;
148+
warnLogCounter += 1;
149+
}
151150
}
152-
}
151+
} while(warnLogCounter != 2);
152+
153153
String loggerString = "Processing size for buffered append-output is high: " +
154154
(data.length() * numEvents) + " characters.";
155155
assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
156-
CheckAppendOutputRunner.stopRunnerForUnitTests();
156+
CheckAppendOutputRunner.stopSchedulerForUnitTests();
157157
}
158158

159159
private class BombardEvents implements Runnable {
@@ -204,7 +204,7 @@ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
204204
numInvocations = 0;
205205
prepareInvocationCounts(listener);
206206
AppendOutputRunner.setListener(listener);
207-
CheckAppendOutputRunner.startRunnerForUnitTests();
207+
CheckAppendOutputRunner.startScheduler();
208208
long startTimeMs = System.currentTimeMillis();
209209
while(numInvocations != numTimes) {
210210
if (System.currentTimeMillis() - startTimeMs > 2000) {

0 commit comments

Comments
 (0)