@@ -46,12 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
4646 protected static final Logger LOGGER = LoggerFactory .getLogger (BaseLivyInterprereter .class );
4747 private static Gson gson = new GsonBuilder ().setPrettyPrinting ().disableHtmlEscaping ().create ();
4848
49- protected SessionInfo sessionInfo ;
49+ protected volatile SessionInfo sessionInfo ;
5050 private String livyURL ;
5151 private long sessionCreationTimeout ;
5252 protected boolean displayAppInfo ;
53- private boolean sessionExpired ;
54- private AtomicBoolean isInitingSession = new AtomicBoolean (false );
53+ private AtomicBoolean sessionExpired = new AtomicBoolean (false );
5554
5655 public BaseLivyInterprereter (Properties property ) {
5756 super (property );
@@ -90,16 +89,17 @@ protected void initLivySession() throws LivyException {
9089 // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
9190 // explicitly by ourselves.
9291 sessionInfo .appId = extractStatementResult (
93- interpret ("sc.applicationId" , false ).message ()
92+ interpret ("sc.applicationId" , false , false ).message ()
9493 .get (0 ).getData ());
9594 }
9695
9796 interpret (
98- "val webui=sc.getClass.getMethod(\" ui\" ).invoke(sc).asInstanceOf[Some[_]].get" , false );
97+ "val webui=sc.getClass.getMethod(\" ui\" ).invoke(sc).asInstanceOf[Some[_]].get" ,
98+ false , false );
9999 if (StringUtils .isEmpty (sessionInfo .appInfo .get ("sparkUiUrl" ))) {
100100 sessionInfo .webUIAddress = extractStatementResult (
101101 interpret (
102- "webui.getClass.getMethod(\" appUIAddress\" ).invoke(webui)" , false )
102+ "webui.getClass.getMethod(\" appUIAddress\" ).invoke(webui)" , false , false )
103103 .message ().get (0 ).getData ());
104104 } else {
105105 sessionInfo .webUIAddress = sessionInfo .appInfo .get ("sparkUiUrl" );
@@ -120,7 +120,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
120120 }
121121
122122 try {
123- return interpret (st , this .displayAppInfo );
123+ return interpret (st , this .displayAppInfo , true );
124124 } catch (LivyException e ) {
125125 LOGGER .error ("Fail to interpret:" + st , e );
126126 return new InterpreterResult (InterpreterResult .Code .ERROR ,
@@ -206,20 +206,22 @@ private SessionInfo getSessionInfo(int sessionId) throws LivyException {
206206 return SessionInfo .fromJson (callRestAPI ("/sessions/" + sessionId , "GET" ));
207207 }
208208
209- public InterpreterResult interpret (String code , boolean displayAppInfo )
209+ public InterpreterResult interpret (String code , boolean displayAppInfo ,
210+ boolean appendSessionExpired )
210211 throws LivyException {
211212 StatementInfo stmtInfo = null ;
213+ boolean sessionExpired = false ;
212214 try {
213215 stmtInfo = executeStatement (new ExecuteRequest (code ));
214216 } catch (SessionNotFoundException e ) {
215217 LOGGER .warn ("Livy session {} is expired, new session will be created." , sessionInfo .id );
216- this . sessionExpired = true ;
218+ sessionExpired = true ;
217219 // we don't want to create multiple sessions because it is possible to have multiple thread
218- // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler
220+ // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
221+ // to check session status again in this sync block
219222 synchronized (this ) {
220- if (isInitingSession . compareAndSet ( false , true )) {
223+ if (isSessionExpired ( )) {
221224 initLivySession ();
222- isInitingSession .set (false );
223225 }
224226 }
225227 stmtInfo = executeStatement (new ExecuteRequest (code ));
@@ -234,19 +236,33 @@ public InterpreterResult interpret(String code, boolean displayAppInfo)
234236 }
235237 stmtInfo = getStatementInfo (stmtInfo .id );
236238 }
237- return appendSessionExpire (getResultFromStatementInfo (stmtInfo , displayAppInfo ));
239+ if (appendSessionExpired ) {
240+ return appendSessionExpire (getResultFromStatementInfo (stmtInfo , displayAppInfo ),
241+ sessionExpired );
242+ } else {
243+ return getResultFromStatementInfo (stmtInfo , displayAppInfo );
244+ }
245+ }
238246
247+ private boolean isSessionExpired () throws LivyException {
248+ try {
249+ getSessionInfo (sessionInfo .id );
250+ return false ;
251+ } catch (SessionNotFoundException e ) {
252+ return true ;
253+ } catch (LivyException e ) {
254+ throw e ;
255+ }
239256 }
240257
241- private InterpreterResult appendSessionExpire (InterpreterResult result ) {
258+ private InterpreterResult appendSessionExpire (InterpreterResult result , boolean sessionExpired ) {
242259 if (sessionExpired ) {
243260 InterpreterResult result2 = new InterpreterResult (result .code ());
261+ result2 .add (InterpreterResult .Type .HTML ,
262+ "<font color=\" red\" >Previous session is expired, new session is created.</font>" );
244263 for (InterpreterResultMessage message : result .message ()) {
245264 result2 .add (message .getType (), message .getData ());
246265 }
247- result2 .add (InterpreterResult .Type .HTML ,
248- "<hr/><font color=\" red\" >Previous session is expired, new session is created.</font>" );
249- sessionExpired = false ;
250266 return result2 ;
251267 } else {
252268 return result ;
0 commit comments