Skip to content

Commit 30c3569

Browse files
committed
address comments
1 parent 88f0d9a commit 30c3569

File tree

2 files changed

+69
-46
lines changed

2 files changed

+69
-46
lines changed

livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
4646
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
4747
private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
4848

49-
protected SessionInfo sessionInfo;
49+
protected volatile SessionInfo sessionInfo;
5050
private String livyURL;
5151
private long sessionCreationTimeout;
5252
protected boolean displayAppInfo;
53-
private boolean sessionExpired;
54-
private AtomicBoolean isInitingSession = new AtomicBoolean(false);
53+
private AtomicBoolean sessionExpired = new AtomicBoolean(false);
5554

5655
public BaseLivyInterprereter(Properties property) {
5756
super(property);
@@ -90,16 +89,17 @@ protected void initLivySession() throws LivyException {
9089
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
9190
// explicitly by ourselves.
9291
sessionInfo.appId = extractStatementResult(
93-
interpret("sc.applicationId", false).message()
92+
interpret("sc.applicationId", false, false).message()
9493
.get(0).getData());
9594
}
9695

9796
interpret(
98-
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
97+
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
98+
false, false);
9999
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
100100
sessionInfo.webUIAddress = extractStatementResult(
101101
interpret(
102-
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
102+
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false)
103103
.message().get(0).getData());
104104
} else {
105105
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
@@ -120,7 +120,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
120120
}
121121

122122
try {
123-
return interpret(st, this.displayAppInfo);
123+
return interpret(st, this.displayAppInfo, true);
124124
} catch (LivyException e) {
125125
LOGGER.error("Fail to interpret:" + st, e);
126126
return new InterpreterResult(InterpreterResult.Code.ERROR,
@@ -206,20 +206,22 @@ private SessionInfo getSessionInfo(int sessionId) throws LivyException {
206206
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
207207
}
208208

209-
public InterpreterResult interpret(String code, boolean displayAppInfo)
209+
public InterpreterResult interpret(String code, boolean displayAppInfo,
210+
boolean appendSessionExpired)
210211
throws LivyException {
211212
StatementInfo stmtInfo = null;
213+
boolean sessionExpired = false;
212214
try {
213215
stmtInfo = executeStatement(new ExecuteRequest(code));
214216
} catch (SessionNotFoundException e) {
215217
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
216-
this.sessionExpired = true;
218+
sessionExpired = true;
217219
// we don't want to create multiple sessions because it is possible to have multiple thread
218-
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler
220+
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
221+
// to check session status again in this sync block
219222
synchronized (this) {
220-
if (isInitingSession.compareAndSet(false, true)) {
223+
if (isSessionExpired()) {
221224
initLivySession();
222-
isInitingSession.set(false);
223225
}
224226
}
225227
stmtInfo = executeStatement(new ExecuteRequest(code));
@@ -234,19 +236,33 @@ public InterpreterResult interpret(String code, boolean displayAppInfo)
234236
}
235237
stmtInfo = getStatementInfo(stmtInfo.id);
236238
}
237-
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo));
239+
if (appendSessionExpired) {
240+
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
241+
sessionExpired);
242+
} else {
243+
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
244+
}
245+
}
238246

247+
private boolean isSessionExpired() throws LivyException {
248+
try {
249+
getSessionInfo(sessionInfo.id);
250+
return false;
251+
} catch (SessionNotFoundException e) {
252+
return true;
253+
} catch (LivyException e) {
254+
throw e;
255+
}
239256
}
240257

241-
private InterpreterResult appendSessionExpire(InterpreterResult result) {
258+
private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) {
242259
if (sessionExpired) {
243260
InterpreterResult result2 = new InterpreterResult(result.code());
261+
result2.add(InterpreterResult.Type.HTML,
262+
"<font color=\"red\">Previous session is expired, new session is created.</font>");
244263
for (InterpreterResultMessage message : result.message()) {
245264
result2.add(message.getType(), message.getData());
246265
}
247-
result2.add(InterpreterResult.Type.HTML,
248-
"<hr/><font color=\"red\">Previous session is expired, new session is created.</font>");
249-
sessionExpired = false;
250266
return result2;
251267
} else {
252268
return result;

livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ public void open() {
5151
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
5252
// to judge whether it is using spark2.
5353
try {
54-
InterpreterResult result = sparkInterpreter.interpret("spark", false);
54+
InterpreterResult result = sparkInterpreter.interpret("spark", false, false);
5555
if (result.code() == InterpreterResult.Code.SUCCESS &&
5656
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
5757
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
5858
sparkInterpreter.getSessionInfo().id);
5959
isSpark2 = true;
6060
} else {
6161
// spark 1.x
62-
result = sparkInterpreter.interpret("sqlContext", false);
62+
result = sparkInterpreter.interpret("sqlContext", false, false);
6363
if (result.code() == InterpreterResult.Code.SUCCESS) {
6464
LOGGER.info("sqlContext is detected.");
6565
} else if (result.code() == InterpreterResult.Code.ERROR) {
@@ -68,7 +68,7 @@ public void open() {
6868
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
6969
result = sparkInterpreter.interpret(
7070
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
71-
+ "import sqlContext.implicits._", false);
71+
+ "import sqlContext.implicits._", false, false);
7272
if (result.code() == InterpreterResult.Code.ERROR) {
7373
throw new LivyException("Fail to create SQLContext," +
7474
result.message().get(0).getData());
@@ -113,37 +113,44 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
113113
} else {
114114
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
115115
}
116-
InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);
117-
118-
if (res.code() == InterpreterResult.Code.SUCCESS) {
119-
StringBuilder resMsg = new StringBuilder();
120-
resMsg.append("%table ");
121-
String[] rows = res.message().get(0).getData().split("\n");
122-
String[] headers = rows[1].split("\\|");
123-
for (int head = 1; head < headers.length; head++) {
124-
resMsg.append(headers[head].trim()).append("\t");
125-
}
126-
resMsg.append("\n");
127-
if (rows[3].indexOf("+") == 0) {
128-
129-
} else {
130-
for (int cols = 3; cols < rows.length - 1; cols++) {
131-
String[] col = rows[cols].split("\\|");
132-
for (int data = 1; data < col.length; data++) {
133-
resMsg.append(col[data].trim()).append("\t");
116+
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true);
117+
118+
if (result.code() == InterpreterResult.Code.SUCCESS) {
119+
InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);
120+
for (InterpreterResultMessage message : result.message()) {
121+
// convert Text type to Table type. We assume the text type must be the sql output. This
122+
// assumption is correct for now. Ideally livy should return table type. We may do it in
123+
// the future release of livy.
124+
if (message.getType() == InterpreterResult.Type.TEXT) {
125+
StringBuilder resMsg = new StringBuilder();
126+
String[] rows = message.getData().split("\n");
127+
String[] headers = rows[1].split("\\|");
128+
for (int head = 1; head < headers.length; head++) {
129+
resMsg.append(headers[head].trim()).append("\t");
134130
}
135131
resMsg.append("\n");
132+
if (rows[3].indexOf("+") == 0) {
133+
134+
} else {
135+
for (int cols = 3; cols < rows.length - 1; cols++) {
136+
String[] col = rows[cols].split("\\|");
137+
for (int data = 1; data < col.length; data++) {
138+
resMsg.append(col[data].trim()).append("\t");
139+
}
140+
resMsg.append("\n");
141+
}
142+
}
143+
if (rows[rows.length - 1].indexOf("only") == 0) {
144+
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
145+
}
146+
result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
147+
} else {
148+
result2.add(message.getType(), message.getData());
136149
}
137150
}
138-
if (rows[rows.length - 1].indexOf("only") == 0) {
139-
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
140-
}
141-
142-
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
143-
resMsg.toString()
144-
);
151+
return result2;
145152
} else {
146-
return res;
153+
return result;
147154
}
148155
} catch (Exception e) {
149156
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);

0 commit comments

Comments
 (0)