Skip to content

Commit f5e1e2e

Browse files
committed
add session reconnect
1 parent bc723e7 commit f5e1e2e

File tree

6 files changed

+110
-8
lines changed

6 files changed

+110
-8
lines changed

zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionResult.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
public class SessionResult {
66

77
private String sessionId;
8+
private String noteId;
89
private String interpreter;
910
private String state;
1011
private String weburl;
1112
private String startTime;
1213

1314
public SessionResult(JSONObject sessionJson) {
1415
this.sessionId = sessionJson.getString("sessionId");
16+
this.noteId = sessionJson.getString("noteId");
1517
this.interpreter = sessionJson.getString("interpreter");
1618
this.state = sessionJson.getString("state");
1719
if (sessionJson.has("weburl")) {
@@ -30,6 +32,10 @@ public String getSessionId() {
3032
return sessionId;
3133
}
3234

35+
public String getNoteId() {
36+
return noteId;
37+
}
38+
3339
public String getInterpreter() {
3440
return interpreter;
3541
}

zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ public ZSession(ClientConfig clientConfig,
6363
this.maxStatement = maxStatement;
6464
}
6565

66+
private ZSession(ClientConfig clientConfig,
67+
String interpreter,
68+
String sessionId) throws Exception {
69+
this.zeppelinClient = new ZeppelinClient(clientConfig);
70+
this.interpreter = interpreter;
71+
this.sessionId = sessionId;
72+
}
73+
6674
/**
6775
* Start this ZSession, underneath it would create a note for this ZSession and
6876
* start a dedicated interpreter group.
@@ -133,6 +141,53 @@ public void stop() throws Exception {
133141
}
134142
}
135143

144+
/**
145+
* Session has been started in ZeppelinServer, this method is just to reconnect it.
146+
* This method is used for connect to an existing session in ZeppelinServer, instead of
147+
* start it from ZSession.
148+
* @throws Exception
149+
*/
150+
public static ZSession createFromExistingSession(ClientConfig clientConfig,
151+
String interpreter,
152+
String sessionId) throws Exception {
153+
return createFromExistingSession(clientConfig, interpreter, sessionId, null);
154+
}
155+
156+
/**
157+
* Session has been started in ZeppelinServer, this method is just to reconnect it.
158+
* This method is used for connect to an existing session in ZeppelinServer, instead of
159+
* start it from ZSession.
160+
* @throws Exception
161+
*/
162+
public static ZSession createFromExistingSession(ClientConfig clientConfig,
163+
String interpreter,
164+
String sessionId,
165+
MessageHandler messageHandler) throws Exception {
166+
ZSession session = new ZSession(clientConfig, interpreter, sessionId);
167+
session.reconnect(messageHandler);
168+
return session;
169+
}
170+
171+
private void reconnect(MessageHandler messageHandler) throws Exception {
172+
SessionResult sessionResult = this.zeppelinClient.getSession(sessionId);
173+
this.noteId = sessionResult.getNoteId();
174+
if (!sessionResult.getState().equalsIgnoreCase("Running")) {
175+
throw new Exception("Session " + sessionId + " is not running, state: " + sessionResult.getState());
176+
}
177+
this.weburl = sessionResult.getWeburl();
178+
179+
if (messageHandler != null) {
180+
this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
181+
this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
182+
.replace("https", "ws").replace("http", "ws") + "/ws");
183+
184+
// call GET_NOTE to establish websocket connection between this session and zeppelin-server
185+
Message msg = new Message(Message.OP.GET_NOTE);
186+
msg.put("id", this.noteId);
187+
this.webSocketClient.send(msg);
188+
}
189+
}
190+
136191
/**
137192
* Run code in non-blocking way.
138193
*
@@ -411,7 +466,7 @@ public ZSession build() throws Exception {
411466

412467
public static void main(String[] args) throws Exception {
413468

414-
ClientConfig clientConfig = new ClientConfig("http://localhost:18086", 1000);
469+
ClientConfig clientConfig = new ClientConfig("http://localhost:8080", 1000);
415470
// ZSession hiveSession = new ZSession(clientConfig, "hive", new HashMap<>(), 100);
416471
// hiveSession.start();
417472
//
@@ -423,8 +478,9 @@ public static void main(String[] args) throws Exception {
423478
// executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
424479
// System.out.println(executeResult.toString());
425480

426-
ZSession sparkSession = new ZSession(clientConfig, "sh", new HashMap<>(), 100);
427-
sparkSession.start();
481+
ZSession sparkSession = ZSession.createFromExistingSession(clientConfig, "hive", "hive_1598418780469");
482+
ExecuteResult executeResult = sparkSession.execute("show databases");
483+
System.out.println(executeResult);
428484

429485
// ExecuteResult executeResult = sparkSession.submit("sql", "show tables");
430486
// executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());

zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,24 @@ public void stopSession(String interpreter, String sessionId) throws Exception {
157157
checkJsonNodeStatus(jsonNode);
158158
}
159159

160+
/**
161+
*
162+
* @param sessionId
163+
* @throws Exception
164+
*/
165+
public SessionResult getSession(String sessionId) throws Exception {
166+
HttpResponse<JsonNode> response = Unirest
167+
.get("/session/{sessionId}")
168+
.routeParam("sessionId", sessionId)
169+
.asJson();
170+
checkResponse(response);
171+
JsonNode jsonNode = response.getBody();
172+
checkJsonNodeStatus(jsonNode);
173+
174+
JSONObject bodyObject = jsonNode.getObject().getJSONObject("body");
175+
return new SessionResult(bodyObject);
176+
}
177+
160178
/**
161179
* Get the session weburl. It is spark ui url for spark interpreter,
162180
* or flink web ui for flink interpreter, or may be null for the interpreter that has no weburl.

zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
2323
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
2424
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
25+
import org.apache.zeppelin.notebook.NoteInfo;
26+
import org.apache.zeppelin.notebook.Notebook;
2527
import org.apache.zeppelin.rest.message.Session;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
@@ -30,6 +32,7 @@
3032
import java.util.HashSet;
3133
import java.util.List;
3234
import java.util.Set;
35+
import java.util.stream.Collectors;
3336

3437
public class SessionManager {
3538

@@ -38,8 +41,10 @@ public class SessionManager {
3841
private static final int RETRY = 3;
3942
private Set<String> sessions = new HashSet<>();
4043
private InterpreterSettingManager interpreterSettingManager;
44+
private Notebook notebook;
4145

42-
public SessionManager(InterpreterSettingManager interpreterSettingManager) {
46+
public SessionManager(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
47+
this.notebook = notebook;
4348
this.interpreterSettingManager = interpreterSettingManager;
4449
}
4550

@@ -83,8 +88,19 @@ public Session getSession(String sessionId) throws Exception {
8388
state = "Stopped";
8489
}
8590
}
86-
return new Session(sessionId,
87-
((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().getName(),
91+
String noteId = "";
92+
String interpreter = ((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().getName();
93+
String notePath = "/_ZSession/" + interpreter + "/" + sessionId;
94+
List<NoteInfo> notesInfo = notebook.getNotesInfo().stream()
95+
.filter(e -> e.getPath().equals(notePath))
96+
.collect(Collectors.toList());
97+
if (notesInfo.size() != 0) {
98+
noteId = notesInfo.get(0).getId();
99+
if (notesInfo.size() > 1) {
100+
LOGGER.warn("Found more than 1 notes with path: " + notePath);
101+
}
102+
}
103+
return new Session(sessionId, noteId, interpreter,
88104
state, interpreterGroup.getWebUrl(), startTime);
89105
}
90106
LOGGER.warn("No such session: " + sessionId);

zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class SessionRestApi {
6060
public SessionRestApi(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
6161
this.notebook = notebook;
6262
this.interpreterSettingManager = interpreterSettingManager;
63-
this.sessionManager = new SessionManager(interpreterSettingManager);
63+
this.sessionManager = new SessionManager(notebook, interpreterSettingManager);
6464
}
6565

6666
@GET

zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/Session.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
public class Session {
44

55
private String sessionId;
6+
private String noteId;
67
private String interpreter;
78
private String state;
89
private String weburl;
910
private String startTime;
1011

11-
public Session(String sessionId, String interpreter, String state, String weburl, String startTime) {
12+
public Session(String sessionId, String noteId, String interpreter, String state, String weburl, String startTime) {
1213
this.sessionId = sessionId;
14+
this.noteId = noteId;
1315
this.interpreter = interpreter;
1416
this.state = state;
1517
this.weburl = weburl;
@@ -20,6 +22,10 @@ public String getSessionId() {
2022
return sessionId;
2123
}
2224

25+
public String getNoteId() {
26+
return noteId;
27+
}
28+
2329
public String getState() {
2430
return state;
2531
}

0 commit comments

Comments
 (0)