Skip to content

Commit bb05371

Browse files
committed
update
1 parent 4205c1c commit bb05371

File tree

14 files changed

+258
-180
lines changed

14 files changed

+258
-180
lines changed

zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,34 @@ public static void main(String[] args) throws Exception {
3838
String zeppelinVersion = zClient.getVersion();
3939
System.out.println("Zeppelin version: " + zeppelinVersion);
4040

41-
ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
42-
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
41+
// execute note 2A94M5J1Z paragraph by paragraph
42+
try {
43+
ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
44+
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
4345

44-
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
45-
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
46+
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
47+
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
4648

49+
Map<String, String> parameters = new HashMap<>();
50+
parameters.put("maxAge", "40");
51+
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
52+
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
53+
54+
parameters = new HashMap<>();
55+
parameters.put("marital", "married");
56+
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
57+
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
58+
} finally {
59+
// you need to stop interpreter explicitly if you are running paragraph separately.
60+
zClient.stopInterpreter("2A94M5J1Z", "spark");
61+
}
62+
63+
// execute this whole note, this note will run under a didicated interpreter process which will be
64+
// stopped after note execution.
4765
Map<String, String> parameters = new HashMap<>();
4866
parameters.put("maxAge", "40");
49-
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
50-
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
51-
52-
parameters = new HashMap<>();
5367
parameters.put("marital", "married");
54-
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
55-
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
68+
NoteResult noteResult = zClient.executeNote("2A94M5J1Z", parameters);
69+
System.out.println("Execute the spark tutorial note, note result: " + noteResult);
5670
}
5771
}

zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,28 @@ public class SessionInfo {
3333
private String weburl;
3434
private String startTime;
3535

36+
public SessionInfo(String sessionId) {
37+
this.sessionId = sessionId;
38+
}
39+
3640
public SessionInfo(JSONObject sessionJson) {
37-
this.sessionId = sessionJson.getString("sessionId");
38-
this.noteId = sessionJson.getString("noteId");
39-
this.interpreter = sessionJson.getString("interpreter");
40-
this.state = sessionJson.getString("state");
41+
if (sessionJson.has("sessionId")) {
42+
this.sessionId = sessionJson.getString("sessionId");
43+
}
44+
if (sessionJson.has("noteId")) {
45+
this.noteId = sessionJson.getString("noteId");
46+
}
47+
if (sessionJson.has("interpreter")) {
48+
this.interpreter = sessionJson.getString("interpreter");
49+
}
50+
if (sessionJson.has("state")) {
51+
this.state = sessionJson.getString("state");
52+
}
4153
if (sessionJson.has("weburl")) {
4254
this.weburl = sessionJson.getString("weburl");
43-
} else {
44-
this.weburl = "";
4555
}
4656
if (sessionJson.has("startTime")) {
4757
this.startTime = sessionJson.getString("startTime");
48-
} else {
49-
this.startTime = "";
5058
}
5159
}
5260

zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ public class ZSession {
4242
// max number of retained statements, each statement represent one paragraph.
4343
private int maxStatement;
4444

45-
private String sessionId;
46-
private String noteId;
4745
private SessionInfo sessionInfo;
4846

4947
private ZeppelinWebSocketClient webSocketClient;
@@ -64,11 +62,11 @@ public ZSession(ClientConfig clientConfig,
6462
}
6563

6664
private ZSession(ClientConfig clientConfig,
67-
String interpreter,
68-
String sessionId) throws Exception {
65+
String interpreter,
66+
String sessionId) throws Exception {
6967
this.zeppelinClient = new ZeppelinClient(clientConfig);
7068
this.interpreter = interpreter;
71-
this.sessionId = sessionId;
69+
this.sessionInfo = new SessionInfo(sessionId);
7270
}
7371

7472
/**
@@ -90,30 +88,29 @@ public void start() throws Exception {
9088
* @throws Exception
9189
*/
9290
public void start(MessageHandler messageHandler) throws Exception {
93-
this.sessionId = zeppelinClient.newSession(interpreter);
91+
this.sessionInfo = zeppelinClient.newSession(interpreter);
9492

95-
this.noteId = zeppelinClient.createNote("/_ZSession/" + interpreter + "/" + sessionId, interpreter);
9693
// inline configuration
9794
StringBuilder builder = new StringBuilder("%" + interpreter + ".conf\n");
9895
if (intpProperties != null) {
9996
for (Map.Entry<String, String> entry : intpProperties.entrySet()) {
10097
builder.append(entry.getKey() + " " + entry.getValue() + "\n");
10198
}
10299
}
103-
String paragraphId = zeppelinClient.addParagraph(noteId, "Session Configuration", builder.toString());
104-
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId, sessionId);
100+
String paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Configuration", builder.toString());
101+
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
105102
if (paragraphResult.getStatus() != Status.FINISHED) {
106103
throw new Exception("Fail to configure session, " + paragraphResult.getResultInText());
107104
}
108105

109106
// start session
110107
// add local properties (init) to avoid skip empty paragraph.
111-
paragraphId = zeppelinClient.addParagraph(noteId, "Session Init", "%" + interpreter + "(init=true)");
112-
paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId, sessionId);
108+
paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Init", "%" + interpreter + "(init=true)");
109+
paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
113110
if (paragraphResult.getStatus() != Status.FINISHED) {
114111
throw new Exception("Fail to init session, " + paragraphResult.getResultInText());
115112
}
116-
this.sessionInfo = zeppelinClient.getSession(sessionId);
113+
this.sessionInfo = zeppelinClient.getSession(getSessionId());
117114

118115
if (messageHandler != null) {
119116
this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
@@ -122,7 +119,7 @@ public void start(MessageHandler messageHandler) throws Exception {
122119

123120
// call GET_NOTE to establish websocket connection between this session and zeppelin-server
124121
Message msg = new Message(Message.OP.GET_NOTE);
125-
msg.put("id", this.noteId);
122+
msg.put("id", getNoteId());
126123
this.webSocketClient.send(msg);
127124
}
128125
}
@@ -133,8 +130,8 @@ public void start(MessageHandler messageHandler) throws Exception {
133130
* @throws Exception
134131
*/
135132
public void stop() throws Exception {
136-
if (sessionId != null) {
137-
zeppelinClient.stopSession(sessionId);
133+
if (getSessionId() != null) {
134+
zeppelinClient.stopSession(getSessionId());
138135
}
139136
if (webSocketClient != null) {
140137
webSocketClient.stop();
@@ -169,10 +166,9 @@ public static ZSession createFromExistingSession(ClientConfig clientConfig,
169166
}
170167

171168
private void reconnect(MessageHandler messageHandler) throws Exception {
172-
this.sessionInfo = this.zeppelinClient.getSession(sessionId);
173-
this.noteId = sessionInfo.getNoteId();
169+
this.sessionInfo = this.zeppelinClient.getSession(getSessionId());
174170
if (!sessionInfo.getState().equalsIgnoreCase("Running")) {
175-
throw new Exception("Session " + sessionId + " is not running, state: " + sessionInfo.getState());
171+
throw new Exception("Session " + getSessionId() + " is not running, state: " + sessionInfo.getState());
176172
}
177173

178174
if (messageHandler != null) {
@@ -182,7 +178,7 @@ private void reconnect(MessageHandler messageHandler) throws Exception {
182178

183179
// call GET_NOTE to establish websocket connection between this session and zeppelin-server
184180
Message msg = new Message(Message.OP.GET_NOTE);
185-
msg.put("id", this.noteId);
181+
msg.put("id", getNoteId());
186182
this.webSocketClient.send(msg);
187183
}
188184
}
@@ -275,13 +271,13 @@ public ExecuteResult execute(String subInterpreter,
275271

276272
String text = builder.toString();
277273

278-
String nextParagraphId = zeppelinClient.nextSessionParagraph(noteId, maxStatement);
279-
zeppelinClient.updateParagraph(noteId, nextParagraphId, "", text);
274+
String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
275+
zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
280276

281277
if (messageHandler != null) {
282278
webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
283279
}
284-
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(noteId, nextParagraphId, sessionId);
280+
ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), nextParagraphId, getSessionId());
285281
return new ExecuteResult(paragraphResult);
286282
}
287283

@@ -366,12 +362,12 @@ public ExecuteResult submit(String subInterpreter,
366362
builder.append("\n" + code);
367363

368364
String text = builder.toString();
369-
String nextParagraphId = zeppelinClient.nextSessionParagraph(noteId, maxStatement);
370-
zeppelinClient.updateParagraph(noteId, nextParagraphId, "", text);
365+
String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
366+
zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
371367
if (messageHandler != null) {
372368
webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
373369
}
374-
ParagraphResult paragraphResult = zeppelinClient.submitParagraph(noteId, nextParagraphId, sessionId);
370+
ParagraphResult paragraphResult = zeppelinClient.submitParagraph(getNoteId(), nextParagraphId, getSessionId());
375371
return new ExecuteResult(paragraphResult);
376372
}
377373

@@ -381,7 +377,7 @@ public ExecuteResult submit(String subInterpreter,
381377
* @throws Exception
382378
*/
383379
public void cancel(String statementId) throws Exception {
384-
zeppelinClient.cancelParagraph(noteId, statementId);
380+
zeppelinClient.cancelParagraph(getNoteId(), statementId);
385381
}
386382

387383
/**
@@ -391,7 +387,7 @@ public void cancel(String statementId) throws Exception {
391387
* @throws Exception
392388
*/
393389
public ExecuteResult queryStatement(String statementId) throws Exception {
394-
ParagraphResult paragraphResult = zeppelinClient.queryParagraphResult(noteId, statementId);
390+
ParagraphResult paragraphResult = zeppelinClient.queryParagraphResult(getNoteId(), statementId);
395391
return new ExecuteResult(paragraphResult);
396392
}
397393

@@ -402,7 +398,7 @@ public ExecuteResult queryStatement(String statementId) throws Exception {
402398
* @throws Exception
403399
*/
404400
public ExecuteResult waitUntilFinished(String statementId) throws Exception {
405-
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, statementId);
401+
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(getNoteId(), statementId);
406402
return new ExecuteResult(paragraphResult);
407403
}
408404

@@ -413,20 +409,32 @@ public ExecuteResult waitUntilFinished(String statementId) throws Exception {
413409
* @throws Exception
414410
*/
415411
public ExecuteResult waitUntilRunning(String statementId) throws Exception {
416-
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphRunning(noteId, statementId);
412+
ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphRunning(getNoteId(), statementId);
417413
return new ExecuteResult(paragraphResult);
418414
}
419415

420416
public String getNoteId() {
421-
return noteId;
417+
if (this.sessionInfo != null) {
418+
return this.sessionInfo.getNoteId();
419+
} else {
420+
return null;
421+
}
422422
}
423423

424424
public String getWeburl() {
425-
return sessionInfo.getWeburl();
425+
if (this.sessionInfo != null) {
426+
return sessionInfo.getWeburl();
427+
} else {
428+
return null;
429+
}
426430
}
427431

428432
public String getSessionId() {
429-
return sessionId;
433+
if (this.sessionInfo != null) {
434+
return this.sessionInfo.getSessionId();
435+
} else {
436+
return null;
437+
}
430438
}
431439

432440
public String getInterpreter() {

0 commit comments

Comments
 (0)