@@ -42,8 +42,6 @@ public class ZSession {
4242 // max number of retained statements, each statement represent one paragraph.
4343 private int maxStatement ;
4444
45- private String sessionId ;
46- private String noteId ;
4745 private SessionInfo sessionInfo ;
4846
4947 private ZeppelinWebSocketClient webSocketClient ;
@@ -64,11 +62,11 @@ public ZSession(ClientConfig clientConfig,
6462 }
6563
6664 private ZSession (ClientConfig clientConfig ,
67- String interpreter ,
68- String sessionId ) throws Exception {
65+ String interpreter ,
66+ String sessionId ) throws Exception {
6967 this .zeppelinClient = new ZeppelinClient (clientConfig );
7068 this .interpreter = interpreter ;
71- this .sessionId = sessionId ;
69+ this .sessionInfo = new SessionInfo ( sessionId ) ;
7270 }
7371
7472 /**
@@ -90,30 +88,29 @@ public void start() throws Exception {
9088 * @throws Exception
9189 */
9290 public void start (MessageHandler messageHandler ) throws Exception {
93- this .sessionId = zeppelinClient .newSession (interpreter );
91+ this .sessionInfo = zeppelinClient .newSession (interpreter );
9492
95- this .noteId = zeppelinClient .createNote ("/_ZSession/" + interpreter + "/" + sessionId , interpreter );
9693 // inline configuration
9794 StringBuilder builder = new StringBuilder ("%" + interpreter + ".conf\n " );
9895 if (intpProperties != null ) {
9996 for (Map .Entry <String , String > entry : intpProperties .entrySet ()) {
10097 builder .append (entry .getKey () + " " + entry .getValue () + "\n " );
10198 }
10299 }
103- String paragraphId = zeppelinClient .addParagraph (noteId , "Session Configuration" , builder .toString ());
104- ParagraphResult paragraphResult = zeppelinClient .executeParagraph (noteId , paragraphId , sessionId );
100+ String paragraphId = zeppelinClient .addParagraph (getNoteId () , "Session Configuration" , builder .toString ());
101+ ParagraphResult paragraphResult = zeppelinClient .executeParagraph (getNoteId () , paragraphId , getSessionId () );
105102 if (paragraphResult .getStatus () != Status .FINISHED ) {
106103 throw new Exception ("Fail to configure session, " + paragraphResult .getResultInText ());
107104 }
108105
109106 // start session
110107 // add local properties (init) to avoid skip empty paragraph.
111- paragraphId = zeppelinClient .addParagraph (noteId , "Session Init" , "%" + interpreter + "(init=true)" );
112- paragraphResult = zeppelinClient .executeParagraph (noteId , paragraphId , sessionId );
108+ paragraphId = zeppelinClient .addParagraph (getNoteId () , "Session Init" , "%" + interpreter + "(init=true)" );
109+ paragraphResult = zeppelinClient .executeParagraph (getNoteId () , paragraphId , getSessionId () );
113110 if (paragraphResult .getStatus () != Status .FINISHED ) {
114111 throw new Exception ("Fail to init session, " + paragraphResult .getResultInText ());
115112 }
116- this .sessionInfo = zeppelinClient .getSession (sessionId );
113+ this .sessionInfo = zeppelinClient .getSession (getSessionId () );
117114
118115 if (messageHandler != null ) {
119116 this .webSocketClient = new ZeppelinWebSocketClient (messageHandler );
@@ -122,7 +119,7 @@ public void start(MessageHandler messageHandler) throws Exception {
122119
123120 // call GET_NOTE to establish websocket connection between this session and zeppelin-server
124121 Message msg = new Message (Message .OP .GET_NOTE );
125- msg .put ("id" , this . noteId );
122+ msg .put ("id" , getNoteId () );
126123 this .webSocketClient .send (msg );
127124 }
128125 }
@@ -133,8 +130,8 @@ public void start(MessageHandler messageHandler) throws Exception {
133130 * @throws Exception
134131 */
135132 public void stop () throws Exception {
136- if (sessionId != null ) {
137- zeppelinClient .stopSession (sessionId );
133+ if (getSessionId () != null ) {
134+ zeppelinClient .stopSession (getSessionId () );
138135 }
139136 if (webSocketClient != null ) {
140137 webSocketClient .stop ();
@@ -169,10 +166,9 @@ public static ZSession createFromExistingSession(ClientConfig clientConfig,
169166 }
170167
171168 private void reconnect (MessageHandler messageHandler ) throws Exception {
172- this .sessionInfo = this .zeppelinClient .getSession (sessionId );
173- this .noteId = sessionInfo .getNoteId ();
169+ this .sessionInfo = this .zeppelinClient .getSession (getSessionId ());
174170 if (!sessionInfo .getState ().equalsIgnoreCase ("Running" )) {
175- throw new Exception ("Session " + sessionId + " is not running, state: " + sessionInfo .getState ());
171+ throw new Exception ("Session " + getSessionId () + " is not running, state: " + sessionInfo .getState ());
176172 }
177173
178174 if (messageHandler != null ) {
@@ -182,7 +178,7 @@ private void reconnect(MessageHandler messageHandler) throws Exception {
182178
183179 // call GET_NOTE to establish websocket connection between this session and zeppelin-server
184180 Message msg = new Message (Message .OP .GET_NOTE );
185- msg .put ("id" , this . noteId );
181+ msg .put ("id" , getNoteId () );
186182 this .webSocketClient .send (msg );
187183 }
188184 }
@@ -275,13 +271,13 @@ public ExecuteResult execute(String subInterpreter,
275271
276272 String text = builder .toString ();
277273
278- String nextParagraphId = zeppelinClient .nextSessionParagraph (noteId , maxStatement );
279- zeppelinClient .updateParagraph (noteId , nextParagraphId , "" , text );
274+ String nextParagraphId = zeppelinClient .nextSessionParagraph (getNoteId () , maxStatement );
275+ zeppelinClient .updateParagraph (getNoteId () , nextParagraphId , "" , text );
280276
281277 if (messageHandler != null ) {
282278 webSocketClient .addStatementMessageHandler (nextParagraphId , messageHandler );
283279 }
284- ParagraphResult paragraphResult = zeppelinClient .executeParagraph (noteId , nextParagraphId , sessionId );
280+ ParagraphResult paragraphResult = zeppelinClient .executeParagraph (getNoteId () , nextParagraphId , getSessionId () );
285281 return new ExecuteResult (paragraphResult );
286282 }
287283
@@ -366,12 +362,12 @@ public ExecuteResult submit(String subInterpreter,
366362 builder .append ("\n " + code );
367363
368364 String text = builder .toString ();
369- String nextParagraphId = zeppelinClient .nextSessionParagraph (noteId , maxStatement );
370- zeppelinClient .updateParagraph (noteId , nextParagraphId , "" , text );
365+ String nextParagraphId = zeppelinClient .nextSessionParagraph (getNoteId () , maxStatement );
366+ zeppelinClient .updateParagraph (getNoteId () , nextParagraphId , "" , text );
371367 if (messageHandler != null ) {
372368 webSocketClient .addStatementMessageHandler (nextParagraphId , messageHandler );
373369 }
374- ParagraphResult paragraphResult = zeppelinClient .submitParagraph (noteId , nextParagraphId , sessionId );
370+ ParagraphResult paragraphResult = zeppelinClient .submitParagraph (getNoteId () , nextParagraphId , getSessionId () );
375371 return new ExecuteResult (paragraphResult );
376372 }
377373
@@ -381,7 +377,7 @@ public ExecuteResult submit(String subInterpreter,
381377 * @throws Exception
382378 */
383379 public void cancel (String statementId ) throws Exception {
384- zeppelinClient .cancelParagraph (noteId , statementId );
380+ zeppelinClient .cancelParagraph (getNoteId () , statementId );
385381 }
386382
387383 /**
@@ -391,7 +387,7 @@ public void cancel(String statementId) throws Exception {
391387 * @throws Exception
392388 */
393389 public ExecuteResult queryStatement (String statementId ) throws Exception {
394- ParagraphResult paragraphResult = zeppelinClient .queryParagraphResult (noteId , statementId );
390+ ParagraphResult paragraphResult = zeppelinClient .queryParagraphResult (getNoteId () , statementId );
395391 return new ExecuteResult (paragraphResult );
396392 }
397393
@@ -402,7 +398,7 @@ public ExecuteResult queryStatement(String statementId) throws Exception {
402398 * @throws Exception
403399 */
404400 public ExecuteResult waitUntilFinished (String statementId ) throws Exception {
405- ParagraphResult paragraphResult = zeppelinClient .waitUtilParagraphFinish (noteId , statementId );
401+ ParagraphResult paragraphResult = zeppelinClient .waitUtilParagraphFinish (getNoteId () , statementId );
406402 return new ExecuteResult (paragraphResult );
407403 }
408404
@@ -413,20 +409,32 @@ public ExecuteResult waitUntilFinished(String statementId) throws Exception {
413409 * @throws Exception
414410 */
415411 public ExecuteResult waitUntilRunning (String statementId ) throws Exception {
416- ParagraphResult paragraphResult = zeppelinClient .waitUtilParagraphRunning (noteId , statementId );
412+ ParagraphResult paragraphResult = zeppelinClient .waitUtilParagraphRunning (getNoteId () , statementId );
417413 return new ExecuteResult (paragraphResult );
418414 }
419415
420416 public String getNoteId () {
421- return noteId ;
417+ if (this .sessionInfo != null ) {
418+ return this .sessionInfo .getNoteId ();
419+ } else {
420+ return null ;
421+ }
422422 }
423423
424424 public String getWeburl () {
425- return sessionInfo .getWeburl ();
425+ if (this .sessionInfo != null ) {
426+ return sessionInfo .getWeburl ();
427+ } else {
428+ return null ;
429+ }
426430 }
427431
428432 public String getSessionId () {
429- return sessionId ;
433+ if (this .sessionInfo != null ) {
434+ return this .sessionInfo .getSessionId ();
435+ } else {
436+ return null ;
437+ }
430438 }
431439
432440 public String getInterpreter () {
0 commit comments