@@ -60,27 +60,45 @@ public class LivyHelper {
6060
6161 public Integer createSession (InterpreterContext context , String kind ) throws Exception {
6262 try {
63+ Map <String , String > conf = new HashMap <String , String >();
64+
65+ conf .put ("spark.master" , property .getProperty ("zeppelin.livy.master" ));
66+
67+ conf .put ("spark.driver.cores" , property .getProperty ("spark.driver.cores" ));
68+ conf .put ("spark.executor.cores" , property .getProperty ("spark.executor.cores" ));
69+ conf .put ("spark.driver.memory" , property .getProperty ("spark.driver.memory" ));
70+ conf .put ("spark.executor.memory" , property .getProperty ("spark.executor.memory" ));
71+
72+ if (!property .getProperty ("spark.dynamicAllocation.enabled" ).equals ("true" )) {
73+ conf .put ("spark.executor.instances" , property .getProperty ("spark.executor.instances" ));
74+ }
75+
76+ if (property .getProperty ("spark.dynamicAllocation.enabled" ).equals ("true" )) {
77+ conf .put ("spark.dynamicAllocation.enabled" ,
78+ property .getProperty ("spark.dynamicAllocation.enabled" ));
79+ conf .put ("spark.shuffle.service.enabled" , "true" );
80+ conf .put ("spark.dynamicAllocation.cachedExecutorIdleTimeout" ,
81+ property .getProperty ("spark.dynamicAllocation.cachedExecutorIdleTimeout" ));
82+ conf .put ("spark.dynamicAllocation.minExecutors" ,
83+ property .getProperty ("spark.dynamicAllocation.minExecutors" ));
84+ conf .put ("spark.dynamicAllocation.initialExecutors" ,
85+ property .getProperty ("spark.dynamicAllocation.initialExecutors" ));
86+ conf .put ("spark.dynamicAllocation.maxExecutors" ,
87+ property .getProperty ("spark.dynamicAllocation.maxExecutors" ));
88+ }
89+
90+ String confData = gson .toJson (conf );
91+
6392 String json = executeHTTP (property .getProperty ("zeppelin.livy.url" ) + "/sessions" ,
64- "POST" ,
93+ "POST" ,
6594 "{" +
6695 "\" kind\" : \" " + kind + "\" , " +
67- "\" master \" : \" " + property . getProperty ( "zeppelin.livy.master" ) + "\" , " +
68- "\" proxyUser\" : \" " + context .getAuthenticationInfo ().getUser () + " \" " +
96+ "\" conf \" : " + confData + ", " +
97+ "\" proxyUser\" : " + context .getAuthenticationInfo ().getUser () +
6998 "}" ,
7099 context .getParagraphId ()
71100 );
72- if (json .contains ("CreateInteractiveRequest[\\ \" master\\ \" ]" )) {
73- json = executeHTTP (property .getProperty ("zeppelin.livy.url" ) + "/sessions" ,
74- "POST" ,
75- "{" +
76- "\" kind\" : \" " + kind + "\" , " +
77- "\" conf\" :{\" spark.master\" : \" "
78- + property .getProperty ("zeppelin.livy.master" ) + "\" }," +
79- "\" proxyUser\" : \" " + context .getAuthenticationInfo ().getUser () + "\" " +
80- "}" ,
81- context .getParagraphId ()
82- );
83- }
101+
84102 Map jsonMap = (Map <Object , Object >) gson .fromJson (json ,
85103 new TypeToken <Map <Object , Object >>() {
86104 }.getType ());
0 commit comments