Skip to content

Commit 4c0d5f0

Browse files
committed
Adding more configurations to livy interpreter
1 parent 2586651 commit 4c0d5f0

File tree

2 files changed

+48
-15
lines changed

2 files changed

+48
-15
lines changed

livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,45 @@ public class LivyHelper {
6060

6161
public Integer createSession(InterpreterContext context, String kind) throws Exception {
6262
try {
63+
Map<String, String> conf = new HashMap<String, String>();
64+
65+
conf.put("spark.master", property.getProperty("zeppelin.livy.master"));
66+
67+
conf.put("spark.driver.cores", property.getProperty("spark.driver.cores"));
68+
conf.put("spark.executor.cores", property.getProperty("spark.executor.cores"));
69+
conf.put("spark.driver.memory", property.getProperty("spark.driver.memory"));
70+
conf.put("spark.executor.memory", property.getProperty("spark.executor.memory"));
71+
72+
if (!property.getProperty("spark.dynamicAllocation.enabled").equals("true")) {
73+
conf.put("spark.executor.instances", property.getProperty("spark.executor.instances"));
74+
}
75+
76+
if (property.getProperty("spark.dynamicAllocation.enabled").equals("true")) {
77+
conf.put("spark.dynamicAllocation.enabled",
78+
property.getProperty("spark.dynamicAllocation.enabled"));
79+
conf.put("spark.shuffle.service.enabled", "true");
80+
conf.put("spark.dynamicAllocation.cachedExecutorIdleTimeout",
81+
property.getProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout"));
82+
conf.put("spark.dynamicAllocation.minExecutors",
83+
property.getProperty("spark.dynamicAllocation.minExecutors"));
84+
conf.put("spark.dynamicAllocation.initialExecutors",
85+
property.getProperty("spark.dynamicAllocation.initialExecutors"));
86+
conf.put("spark.dynamicAllocation.maxExecutors",
87+
property.getProperty("spark.dynamicAllocation.maxExecutors"));
88+
}
89+
90+
String confData = gson.toJson(conf);
91+
6392
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
64-
"POST",
93+
"POST",
6594
"{" +
6695
"\"kind\": \"" + kind + "\", " +
67-
"\"master\": \"" + property.getProperty("zeppelin.livy.master") + "\", " +
68-
"\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" +
96+
"\"conf\": " + confData + ", " +
97+
"\"proxyUser\": " + context.getAuthenticationInfo().getUser() +
6998
"}",
7099
context.getParagraphId()
71100
);
72-
if (json.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
73-
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
74-
"POST",
75-
"{" +
76-
"\"kind\": \"" + kind + "\", " +
77-
"\"conf\":{\"spark.master\": \""
78-
+ property.getProperty("zeppelin.livy.master") + "\"}," +
79-
"\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" +
80-
"}",
81-
context.getParagraphId()
82-
);
83-
}
101+
84102
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
85103
new TypeToken<Map<Object, Object>>() {
86104
}.getType());

livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ public class LivySparkInterpreter extends Interpreter {
4646
new InterpreterPropertyBuilder()
4747
.add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
4848
.add("zeppelin.livy.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077")
49+
.add("spark.driver.cores", "1", "Driver cores. ex) 1, 2")
50+
.add("spark.driver.memory", "512m", "Driver memory. ex) 512m, 32g")
51+
.add("spark.executor.instances", "3", "Executor instances. ex) 1, 4")
52+
.add("spark.executor.cores", "1", "Num cores per executor. ex) 1, 4")
53+
.add("spark.executor.memory", "512m",
54+
"Executor memory per worker instance. ex) 512m, 32g")
55+
.add("spark.dynamicAllocation.enabled", "false", "Use dynamic resource allocation")
56+
.add("spark.dynamicAllocation.cachedExecutorIdleTimeout", "120s",
57+
"Remove an executor which has cached data blocks")
58+
.add("spark.dynamicAllocation.minExecutors", "0",
59+
"Lower bound for the number of executors if dynamic allocation is enabled. ")
60+
.add("spark.dynamicAllocation.initialExecutors", "1",
61+
"Initial number of executors to run if dynamic allocation is enabled. ")
62+
.add("spark.dynamicAllocation.maxExecutors", "10",
63+
"Upper bound for the number of executors if dynamic allocation is enabled. ")
4964
.build()
5065
);
5166
}

0 commit comments

Comments
 (0)