Skip to content

Commit 2e30e3d

Browse files
committed
[ZEPPELIN-940] Allow zeppelin server to connect to already executing Remote Interpreter
1 parent 781a700 commit 2e30e3d

File tree

2 files changed

+100
-32
lines changed

2 files changed

+100
-32
lines changed

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,18 @@
3131
import java.io.IOException;
3232
import java.util.Map;
3333
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.Properties;
3435

3536
/**
3637
*
3738
*/
3839
public class RemoteInterpreterProcess implements ExecuteResultHandler {
3940
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
41+
private static final String ZEPPELIN_INTERPRETER_PORT = "zeppelin.interpreter.port";
42+
43+
private static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host";
44+
45+
public static final String ZEPPELIN_INTERPRETER_ISEXECUTING = "zeppelin.interpreter.isexecuting";
4046

4147
private final AtomicInteger referenceCount;
4248
private DefaultExecutor executor;
@@ -52,6 +58,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
5258
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
5359
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
5460
private int connectTimeout;
61+
String host = "localhost";
62+
boolean isInterpreterAlreadyExecuting = false;
5563

5664
public RemoteInterpreterProcess(String intpRunner,
5765
String intpDir,
@@ -91,54 +99,82 @@ public int getPort() {
9199
public int reference(InterpreterGroup interpreterGroup) {
92100
synchronized (referenceCount) {
93101
if (executor == null) {
94-
// start server process
95-
try {
96-
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
97-
} catch (IOException e1) {
98-
throw new InterpreterException(e1);
102+
Properties properties = interpreterGroup.getProperty();
103+
104+
if (properties.containsKey(ZEPPELIN_INTERPRETER_ISEXECUTING)) {
105+
isInterpreterAlreadyExecuting =
106+
Boolean.parseBoolean(properties.getProperty(ZEPPELIN_INTERPRETER_ISEXECUTING));
107+
if (isInterpreterAlreadyExecuting) {
108+
if (properties.containsKey(ZEPPELIN_INTERPRETER_HOST)) {
109+
host = properties.getProperty(ZEPPELIN_INTERPRETER_HOST);
110+
111+
} else {
112+
throw new InterpreterException("Can't find property " + ZEPPELIN_INTERPRETER_HOST
113+
+ ".Please specify the host on which interpreter is executing");
114+
}
115+
if (properties.containsKey(ZEPPELIN_INTERPRETER_PORT)) {
116+
port = Integer
117+
.parseInt(interpreterGroup.getProperty().getProperty(ZEPPELIN_INTERPRETER_PORT));
118+
} else {
119+
throw new InterpreterException("Can't find property " + ZEPPELIN_INTERPRETER_PORT
120+
+ ".Please specify the port on which interpreter is listening");
121+
}
122+
}
123+
running = true;
99124
}
100125

101-
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
102-
cmdLine.addArgument("-d", false);
103-
cmdLine.addArgument(interpreterDir, false);
104-
cmdLine.addArgument("-p", false);
105-
cmdLine.addArgument(Integer.toString(port), false);
106-
cmdLine.addArgument("-l", false);
107-
cmdLine.addArgument(localRepoDir, false);
108-
109-
executor = new DefaultExecutor();
110-
111-
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
112-
executor.setWatchdog(watchdog);
126+
if (!isInterpreterAlreadyExecuting) {
127+
try {
128+
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
129+
} catch (IOException e1) {
130+
throw new InterpreterException(e1);
131+
}
132+
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
133+
cmdLine.addArgument("-d", false);
134+
cmdLine.addArgument(interpreterDir, false);
135+
cmdLine.addArgument("-p", false);
136+
cmdLine.addArgument(Integer.toString(port), false);
137+
cmdLine.addArgument("-l", false);
138+
cmdLine.addArgument(localRepoDir, false);
139+
140+
executor = new DefaultExecutor();
141+
142+
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
143+
executor.setWatchdog(watchdog);
144+
145+
running = true;
146+
try {
147+
Map procEnv = EnvironmentUtils.getProcEnvironment();
148+
procEnv.putAll(env);
149+
150+
logger.info("Run interpreter process {}", cmdLine);
151+
executor.execute(cmdLine, procEnv, this);
152+
153+
} catch (IOException e) {
154+
running = false;
155+
throw new InterpreterException(e);
156+
}
113157

114-
running = true;
115-
try {
116-
Map procEnv = EnvironmentUtils.getProcEnvironment();
117-
procEnv.putAll(env);
118-
119-
logger.info("Run interpreter process {}", cmdLine);
120-
executor.execute(cmdLine, procEnv, this);
121-
} catch (IOException e) {
122-
running = false;
123-
throw new InterpreterException(e);
158+
} else {
159+
logger.info(
160+
"Not starting interpreter as \"zeppelin.interpreter.isexecuting\" is set to true");
124161
}
125162

126-
127163
long startTime = System.currentTimeMillis();
128164
while (System.currentTimeMillis() - startTime < connectTimeout) {
129-
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
165+
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(host, port)) {
130166
break;
131167
} else {
132168
try {
133169
Thread.sleep(500);
134170
} catch (InterruptedException e) {
135-
logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
136-
"Thread.sleep", e);
171+
logger.error("Exception in RemoteInterpreterProcess while synchronized reference "
172+
+ "Thread.sleep", e);
137173
}
138174
}
139175
}
140176

141-
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
177+
clientPool = new GenericObjectPool<Client>(new ClientFactory(host, port));
142178

143179
remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
144180
remoteInterpreterEventPoller.setInterpreterProcess(this);

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import static org.mockito.Mockito.*;
2323

2424
import java.util.HashMap;
25+
import java.util.Properties;
2526

27+
import org.apache.thrift.TException;
28+
import org.apache.thrift.transport.TTransportException;
2629
import org.apache.zeppelin.interpreter.InterpreterGroup;
2730
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
2831
import org.junit.Test;
@@ -32,6 +35,7 @@ public class RemoteInterpreterProcessTest {
3235
System.getProperty("os.name").startsWith("Windows") ?
3336
"../bin/interpreter.cmd" :
3437
"../bin/interpreter.sh";
38+
private static final int DUMMY_PORT=3678;
3539

3640
@Test
3741
public void testStartStop() {
@@ -70,4 +74,32 @@ public void testClientFactory() throws Exception {
7074

7175
rip.dereference();
7276
}
77+
78+
@Test
79+
public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
80+
RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
81+
server.start();
82+
boolean running = false;
83+
long startTime = System.currentTimeMillis();
84+
while (System.currentTimeMillis() - startTime < 10 * 1000) {
85+
if (server.isRunning()) {
86+
running = true;
87+
break;
88+
} else {
89+
Thread.sleep(200);
90+
}
91+
}
92+
Properties properties = new Properties();
93+
properties.setProperty("zeppelin.interpreter.port", "3678");
94+
properties.setProperty("zeppelin.interpreter.host", "localhost");
95+
properties.setProperty("zeppelin.interpreter.isexecuting", "true");
96+
InterpreterGroup intpGroup = mock(InterpreterGroup.class);
97+
when(intpGroup.getProperty()).thenReturn(properties);
98+
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(INTERPRETER_SCRIPT, "nonexists",
99+
"fakeRepo", new HashMap<String, String>(), 10 * 1000, null);
100+
assertFalse(rip.isRunning());
101+
assertEquals(0, rip.referenceCount());
102+
assertEquals(1, rip.reference(intpGroup));
103+
assertEquals(true, rip.isRunning());
104+
}
73105
}

0 commit comments

Comments
 (0)