3939import org .apache .spark .SparkContext ;
4040import org .apache .spark .SparkEnv ;
4141
42+ import org .apache .spark .SecurityManager ;
4243import org .apache .spark .repl .SparkILoop ;
4344import org .apache .spark .scheduler .ActiveJob ;
4445import org .apache .spark .scheduler .DAGScheduler ;
4546import org .apache .spark .scheduler .Pool ;
4647import org .apache .spark .sql .SQLContext ;
4748import org .apache .spark .ui .jobs .JobProgressListener ;
49+ import org .apache .spark .util .Utils ;
4850import org .apache .zeppelin .interpreter .Interpreter ;
4951import org .apache .zeppelin .interpreter .InterpreterContext ;
5052import org .apache .zeppelin .interpreter .InterpreterException ;
7880import scala .tools .nsc .Settings ;
7981import scala .tools .nsc .interpreter .Completion .Candidates ;
8082import scala .tools .nsc .interpreter .Completion .ScalaCompleter ;
83+ import scala .tools .nsc .interpreter .IMain ;
8184import scala .tools .nsc .interpreter .Results ;
8285import scala .tools .nsc .settings .MutableSettings ;
8386import scala .tools .nsc .settings .MutableSettings .BooleanSetting ;
@@ -115,6 +118,8 @@ public class SparkInterpreter extends Interpreter {
115118
116119 private Map <String , Object > binder ;
117120 private SparkVersion sparkVersion ;
121+ private File outputDir ; // class outputdir for scala 2.11
122+ private HttpServer classServer ; // classserver for scala 2.11
118123
119124
120125 public SparkInterpreter (Properties property ) {
@@ -282,6 +287,19 @@ public SparkContext createSparkContext() {
282287 }
283288 }
284289
290+
291+ if (isScala2_11 ()) {
292+ SparkConf conf = new SparkConf ();
293+ classServer = new HttpServer (
294+ conf ,
295+ outputDir ,
296+ new SecurityManager (conf ),
297+ 0 ,
298+ "HTTP server" );
299+ classServer .start ();
300+ classServerUri = classServer .uri ();
301+ }
302+
285303 SparkConf conf =
286304 new SparkConf ()
287305 .setMaster (getProperty ("master" ))
@@ -413,26 +431,49 @@ public void open() {
413431 * getClass.getClassLoader >> } >> in.setContextClassLoader()
414432 */
415433 Settings settings = new Settings ();
416- if (getProperty ("args" ) != null ) {
417- String [] argsArray = getProperty ("args" ).split (" " );
418- LinkedList <String > argList = new LinkedList <String >();
419- for (String arg : argsArray ) {
420- argList .add (arg );
421- }
422434
435+ // process args
436+ String args = getProperty ("args" );
437+ if (args == null ) {
438+ args = "" ;
439+ }
440+
441+ String [] argsArray = args .split (" " );
442+ LinkedList <String > argList = new LinkedList <String >();
443+ for (String arg : argsArray ) {
444+ argList .add (arg );
445+ }
446+
447+ if (isScala2_10 ()) {
423448 scala .collection .immutable .List <String > list =
424449 JavaConversions .asScalaBuffer (argList ).toList ();
425450
426- if (isScala2_10 ()) {
427- Object sparkCommandLine = instantiateClass (
428- "org.apache.spark.repl.SparkCommandLine" ,
429- new Class []{ list .getClass () },
430- new Object []{ list });
451+ Object sparkCommandLine = instantiateClass (
452+ "org.apache.spark.repl.SparkCommandLine" ,
453+ new Class []{ list .getClass () },
454+ new Object []{ list });
431455
432- settings = (Settings ) invokeMethod (sparkCommandLine , "settings" );
433- } else {
434- settings .processArguments (list , true );
456+ settings = (Settings ) invokeMethod (sparkCommandLine , "settings" );
457+ } else {
458+ String sparkReplClassDir = getProperty ("spark.repl.classdir" );
459+ if (sparkReplClassDir == null ) {
460+ sparkReplClassDir = System .getProperty ("spark.repl.classdir" );
461+ }
462+ if (sparkReplClassDir == null ) {
463+ sparkReplClassDir = System .getProperty ("java.io.tmpdir" );
435464 }
465+
466+ outputDir = Utils .createTempDir (sparkReplClassDir , "classdir" );
467+
468+ argList .add ("-Yrepl-class-based" );
469+ argList .add ("-Yrepl-outdir" );
470+ argList .add (outputDir .getAbsolutePath ());
471+
472+
473+ scala .collection .immutable .List <String > list =
474+ JavaConversions .asScalaBuffer (argList ).toList ();
475+
476+ settings .processArguments (list , true );
436477 }
437478
438479 // set classpath for scala compiler
@@ -526,24 +567,22 @@ public void open() {
526567 if (isScala2_10 ()) {
527568 invokeMethod (intp , "setContextClassLoader" );
528569 invokeMethod (intp , "initializeSynchronous" );
529- }
530570
531- if (classOutputDir == null ) {
532- classOutputDir = settings .outputDirs ().getSingleOutput ().get ();
533- } else {
534- // change SparkIMain class output dir
535- settings .outputDirs ().setSingleOutput (classOutputDir );
536- ClassLoader cl = (ClassLoader ) invokeMethod (intp , "classLoader" );
537- try {
538- Field rootField = cl .getClass ().getSuperclass ().getDeclaredField ("root" );
539- rootField .setAccessible (true );
540- rootField .set (cl , classOutputDir );
541- } catch (NoSuchFieldException | IllegalAccessException e ) {
542- logger .error (e .getMessage (), e );
571+ if (classOutputDir == null ) {
572+ classOutputDir = settings .outputDirs ().getSingleOutput ().get ();
573+ } else {
574+ // change SparkIMain class output dir
575+ settings .outputDirs ().setSingleOutput (classOutputDir );
576+ ClassLoader cl = (ClassLoader ) invokeMethod (intp , "classLoader" );
577+ try {
578+ Field rootField = cl .getClass ().getSuperclass ().getDeclaredField ("root" );
579+ rootField .setAccessible (true );
580+ rootField .set (cl , classOutputDir );
581+ } catch (NoSuchFieldException | IllegalAccessException e ) {
582+ logger .error (e .getMessage (), e );
583+ }
543584 }
544- }
545585
546- if (isScala2_10 ()) {
547586 completor = instantiateClass (
548587 "SparkJLineCompletion" ,
549588 new Class []{findClass ("org.apache.spark.repl.SparkIMain" )},
@@ -568,8 +607,8 @@ public void open() {
568607 z = new ZeppelinContext (sc , sqlc , null , dep ,
569608 Integer .parseInt (getProperty ("zeppelin.spark.maxResult" )));
570609
571- interpret ("@transient var _binder = new java.util.HashMap[String, Object]()" );
572- binder = (Map <String , Object >) getValue ( "_binder" );
610+ interpret ("@transient val _binder = new java.util.HashMap[String, Object]()" );
611+ binder = (Map <String , Object >) getLastObject ( );
573612 binder .put ("sc" , sc );
574613 binder .put ("sqlc" , sqlc );
575614 binder .put ("z" , z );
@@ -769,9 +808,14 @@ private String getCompletionTargetString(String text, int cursor) {
769808 return resultCompletionText ;
770809 }
771810
811+ /*
812+ * this method doesn't work in scala 2.11
813+ * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
814+ */
772815 public Object getValue (String name ) {
773816 Object ret = invokeMethod (intp , "valueOfTerm" , new Class []{String .class }, new Object []{name });
774- if (ret instanceof None ) {
817+
818+ if (ret instanceof None || ret instanceof scala .None$ ) {
775819 return null ;
776820 } else if (ret instanceof Some ) {
777821 return ((Some ) ret ).get ();
@@ -780,6 +824,13 @@ public Object getValue(String name) {
780824 }
781825 }
782826
827+ public Object getLastObject () {
828+ IMain .Request r = (IMain .Request ) invokeMethod (intp , "lastRequest" );
829+ Object obj = r .lineRep ().call ("$result" ,
830+ JavaConversions .asScalaBuffer (new LinkedList <Object >()));
831+ return obj ;
832+ }
833+
783834 String getJobGroup (InterpreterContext context ){
784835 return "zeppelin-" + context .getParagraphId ();
785836 }
@@ -1049,6 +1100,10 @@ public void close() {
10491100 if (numReferenceOfSparkContext .decrementAndGet () == 0 ) {
10501101 sc .stop ();
10511102 sc = null ;
1103+ if (classServer != null ) {
1104+ classServer .stop ();
1105+ classServer = null ;
1106+ }
10521107 }
10531108
10541109 invokeMethod (intp , "close" );
@@ -1144,7 +1199,7 @@ private boolean isScala2_10() {
11441199 }
11451200
11461201 private boolean isScala2_11 () {
1147- return !isScala2_11 ();
1202+ return !isScala2_10 ();
11481203 }
11491204
11501205
0 commit comments