3131import java .io .IOException ;
3232import java .util .Map ;
3333import java .util .concurrent .atomic .AtomicInteger ;
34+ import java .util .Properties ;
3435
3536/**
3637 *
3738 */
3839public class RemoteInterpreterProcess implements ExecuteResultHandler {
3940 private static final Logger logger = LoggerFactory .getLogger (RemoteInterpreterProcess .class );
41+ private static final String ZEPPELIN_INTERPRETER_PORT = "zeppelin.interpreter.port" ;
42+
43+ private static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host" ;
44+
45+ public static final String ZEPPELIN_INTERPRETER_ISEXECUTING = "zeppelin.interpreter.isexecuting" ;
4046
4147 private final AtomicInteger referenceCount ;
4248 private DefaultExecutor executor ;
@@ -52,6 +58,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
5258 private final RemoteInterpreterEventPoller remoteInterpreterEventPoller ;
5359 private final InterpreterContextRunnerPool interpreterContextRunnerPool ;
5460 private int connectTimeout ;
61+ String host = "localhost" ;
62+ boolean isInterpreterAlreadyExecuting = false ;
5563
5664 public RemoteInterpreterProcess (String intpRunner ,
5765 String intpDir ,
@@ -91,54 +99,82 @@ public int getPort() {
9199 public int reference (InterpreterGroup interpreterGroup ) {
92100 synchronized (referenceCount ) {
93101 if (executor == null ) {
94- // start server process
95- try {
96- port = RemoteInterpreterUtils .findRandomAvailablePortOnAllLocalInterfaces ();
97- } catch (IOException e1 ) {
98- throw new InterpreterException (e1 );
102+ Properties properties = interpreterGroup .getProperty ();
103+
104+ if (properties .containsKey (ZEPPELIN_INTERPRETER_ISEXECUTING )) {
105+ isInterpreterAlreadyExecuting =
106+ Boolean .parseBoolean (properties .getProperty (ZEPPELIN_INTERPRETER_ISEXECUTING ));
107+ if (isInterpreterAlreadyExecuting ) {
108+ if (properties .containsKey (ZEPPELIN_INTERPRETER_HOST )) {
109+ host = properties .getProperty (ZEPPELIN_INTERPRETER_HOST );
110+
111+ } else {
112+ throw new InterpreterException ("Can't find property " + ZEPPELIN_INTERPRETER_HOST
113+ + ".Please specify the host on which interpreter is executing" );
114+ }
115+ if (properties .containsKey (ZEPPELIN_INTERPRETER_PORT )) {
116+ port = Integer
117+ .parseInt (interpreterGroup .getProperty ().getProperty (ZEPPELIN_INTERPRETER_PORT ));
118+ } else {
119+ throw new InterpreterException ("Can't find property " + ZEPPELIN_INTERPRETER_PORT
120+ + ".Please specify the port on which interpreter is listening" );
121+ }
122+ }
123+ running = true ;
99124 }
100125
101- CommandLine cmdLine = CommandLine .parse (interpreterRunner );
102- cmdLine .addArgument ("-d" , false );
103- cmdLine .addArgument (interpreterDir , false );
104- cmdLine .addArgument ("-p" , false );
105- cmdLine .addArgument (Integer .toString (port ), false );
106- cmdLine .addArgument ("-l" , false );
107- cmdLine .addArgument (localRepoDir , false );
108-
109- executor = new DefaultExecutor ();
110-
111- watchdog = new ExecuteWatchdog (ExecuteWatchdog .INFINITE_TIMEOUT );
112- executor .setWatchdog (watchdog );
126+ if (!isInterpreterAlreadyExecuting ) {
127+ try {
128+ port = RemoteInterpreterUtils .findRandomAvailablePortOnAllLocalInterfaces ();
129+ } catch (IOException e1 ) {
130+ throw new InterpreterException (e1 );
131+ }
132+ CommandLine cmdLine = CommandLine .parse (interpreterRunner );
133+ cmdLine .addArgument ("-d" , false );
134+ cmdLine .addArgument (interpreterDir , false );
135+ cmdLine .addArgument ("-p" , false );
136+ cmdLine .addArgument (Integer .toString (port ), false );
137+ cmdLine .addArgument ("-l" , false );
138+ cmdLine .addArgument (localRepoDir , false );
139+
140+ executor = new DefaultExecutor ();
141+
142+ watchdog = new ExecuteWatchdog (ExecuteWatchdog .INFINITE_TIMEOUT );
143+ executor .setWatchdog (watchdog );
144+
145+ running = true ;
146+ try {
147+ Map procEnv = EnvironmentUtils .getProcEnvironment ();
148+ procEnv .putAll (env );
149+
150+ logger .info ("Run interpreter process {}" , cmdLine );
151+ executor .execute (cmdLine , procEnv , this );
152+
153+ } catch (IOException e ) {
154+ running = false ;
155+ throw new InterpreterException (e );
156+ }
113157
114- running = true ;
115- try {
116- Map procEnv = EnvironmentUtils .getProcEnvironment ();
117- procEnv .putAll (env );
118-
119- logger .info ("Run interpreter process {}" , cmdLine );
120- executor .execute (cmdLine , procEnv , this );
121- } catch (IOException e ) {
122- running = false ;
123- throw new InterpreterException (e );
158+ } else {
159+ logger .info (
160+ "Not starting interpreter as \" zeppelin.interpreter.isexecuting\" is set to true" );
124161 }
125162
126-
127163 long startTime = System .currentTimeMillis ();
128164 while (System .currentTimeMillis () - startTime < connectTimeout ) {
129- if (RemoteInterpreterUtils .checkIfRemoteEndpointAccessible ("localhost" , port )) {
165+ if (RemoteInterpreterUtils .checkIfRemoteEndpointAccessible (host , port )) {
130166 break ;
131167 } else {
132168 try {
133169 Thread .sleep (500 );
134170 } catch (InterruptedException e ) {
135- logger .error ("Exception in RemoteInterpreterProcess while synchronized reference " +
136- "Thread.sleep" , e );
171+ logger .error ("Exception in RemoteInterpreterProcess while synchronized reference "
172+ + "Thread.sleep" , e );
137173 }
138174 }
139175 }
140176
141- clientPool = new GenericObjectPool <Client >(new ClientFactory ("localhost" , port ));
177+ clientPool = new GenericObjectPool <Client >(new ClientFactory (host , port ));
142178
143179 remoteInterpreterEventPoller .setInterpreterGroup (interpreterGroup );
144180 remoteInterpreterEventPoller .setInterpreterProcess (this );
0 commit comments