Skip to content

Commit d1d642e

Browse files
author
Ilya Ganelin
committed
Merge pull request #1 from Leemoonsoo/ZEPPELIN-23
[ZEPPELIN-23] Set version of default spark interpreter build profile from 1.1 to 1.3
2 parents 1402df2 + 0108781 commit d1d642e

File tree

4 files changed

+211
-135
lines changed

4 files changed

+211
-135
lines changed

pom.xml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
</modules>
9999

100100
<properties>
101-
<spark.version>1.3.0</spark.version>
101+
<spark.version>1.1.1</spark.version>
102102
<scala.version>2.10.4</scala.version>
103103
<scala.binary.version>2.10</scala.binary.version>
104104
<scala.macros.version>2.0.1</scala.macros.version>
@@ -1308,11 +1308,14 @@
13081308

13091309
<profile>
13101310
<id>spark-1.3</id>
1311+
<activation>
1312+
<activeByDefault>true</activeByDefault>
1313+
</activation>
13111314
<dependencies>
13121315
</dependencies>
13131316
<properties>
13141317
<akka.version>2.3.4-spark</akka.version>
1315-
<spark.version>1.3.0</spark.version>
1318+
<spark.version>1.3.1</spark.version>
13161319
<mesos.version>0.21.0</mesos.version>
13171320
<hbase.version>0.98.7</hbase.version>
13181321
<hbase.artifact>hbase</hbase.artifact>

spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException,
143143

144144
// Until spark 1.1.x
145145
// check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
146-
private void updateRuntimeClassPath(URL[] urls) throws SecurityException, IllegalAccessException,
147-
IllegalArgumentException, InvocationTargetException, NoSuchMethodException {
146+
private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
147+
IllegalAccessException, IllegalArgumentException,
148+
InvocationTargetException, NoSuchMethodException {
148149
ClassLoader cl = intp.classLoader().getParent();
149150
Method addURL;
150151
addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
@@ -154,6 +155,18 @@ private void updateRuntimeClassPath(URL[] urls) throws SecurityException, Illega
154155
}
155156
}
156157

158+
private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
159+
IllegalAccessException, IllegalArgumentException,
160+
InvocationTargetException, NoSuchMethodException {
161+
ClassLoader cl = intp.classLoader().getParent();
162+
Method addURL;
163+
addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
164+
addURL.setAccessible(true);
165+
for (URL url : urls) {
166+
addURL.invoke(cl, url);
167+
}
168+
}
169+
157170
private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
158171
IndexedSeq<ClassPath<AbstractFile>> entries =
159172
((MergedClassPath<AbstractFile>) platform.classPath()).entries();
@@ -217,8 +230,11 @@ private void loadFromFs(String artifact, boolean addSparkContext) throws Excepti
217230

218231
intp.global().new Run();
219232

220-
updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()});
221-
updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()});
233+
if (sc.version().startsWith("1.1")) {
234+
updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
235+
} else {
236+
updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
237+
}
222238

223239
if (addSparkContext) {
224240
sc.addJar(jarFile.getAbsolutePath());
@@ -261,7 +277,11 @@ private List<String> loadFromMvn(String artifact, Collection<String> excludes,
261277
}
262278

263279
intp.global().new Run();
264-
updateRuntimeClassPath(newClassPathList.toArray(new URL[0]));
280+
if (sc.version().startsWith("1.1")) {
281+
updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
282+
} else {
283+
updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
284+
}
265285
updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
266286

267287
if (addSparkContext) {

spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java

Lines changed: 74 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,41 @@ public class SparkInterpreterTest {
4545
private InterpreterContext context;
4646
private File tmpDir;
4747

48+
49+
/**
50+
* Get spark version number as a numerical value.
51+
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
52+
*/
53+
public static int getSparkVersionNumber() {
54+
if (repl == null) {
55+
return 0;
56+
}
57+
58+
String[] split = repl.getSparkContext().version().split(".");
59+
int version = Integer.parseInt(split[0]) + Integer.parseInt(split[1]);
60+
return version;
61+
}
62+
4863
@Before
4964
public void setUp() throws Exception {
5065
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
5166
System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
5267

5368
tmpDir.mkdirs();
5469

55-
if (repl == null) {
56-
Properties p = new Properties();
70+
if (repl == null) {
71+
Properties p = new Properties();
5772

58-
repl = new SparkInterpreter(p);
59-
repl.open();
60-
}
73+
repl = new SparkInterpreter(p);
74+
repl.open();
75+
}
6176

62-
InterpreterGroup intpGroup = new InterpreterGroup();
63-
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
64-
new AngularObjectRegistry(intpGroup.getId(), null),
77+
InterpreterGroup intpGroup = new InterpreterGroup();
78+
context = new InterpreterContext("id", "title", "text",
79+
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
80+
intpGroup.getId(), null),
6581
new LinkedList<InterpreterContextRunner>());
66-
}
82+
}
6783

6884
@After
6985
public void tearDown() throws Exception {
@@ -83,52 +99,55 @@ else if (file.isDirectory()) {
8399
}
84100
}
85101

86-
@Test
87-
public void testBasicIntp() {
88-
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code());
89-
90-
// when interpret incomplete expression
91-
InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
92-
assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
93-
assertTrue(incomplete.message().length()>0); // expecting some error message
94-
/*
95-
assertEquals(1, repl.getValue("a"));
96-
assertEquals(2, repl.getValue("b"));
97-
repl.interpret("val ver = sc.version");
98-
assertNotNull(repl.getValue("ver"));
99-
assertEquals("HELLO\n", repl.interpret("println(\"HELLO\")").message());
100-
*/
101-
}
102-
103-
@Test
104-
public void testEndWithComment() {
105-
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
106-
}
107-
108-
@Test
109-
public void testSparkSql(){
110-
repl.interpret("case class Person(name:String, age:Int)\n", context);
111-
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
112-
assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
113-
114-
// create new interpreter
115-
Properties p = new Properties();
116-
SparkInterpreter repl2 = new SparkInterpreter(p);
117-
repl2.open();
118-
119-
repl.interpret("case class Man(name:String, age:Int)", context);
120-
repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
121-
assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
122-
repl2.getSparkContext().stop();
123-
}
124-
125-
@Test
126-
public void testReferencingUndefinedVal(){
127-
InterpreterResult result = repl.interpret("def category(min: Int) = {" +
128-
" if (0 <= value) \"error\"" +
129-
"}", context);
130-
assertEquals(Code.ERROR, result.code());
131-
}
102+
@Test
103+
public void testBasicIntp() {
104+
assertEquals(InterpreterResult.Code.SUCCESS,
105+
repl.interpret("val a = 1\nval b = 2", context).code());
106+
107+
// when interpret incomplete expression
108+
InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
109+
assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
110+
assertTrue(incomplete.message().length() > 0); // expecting some error
111+
// message
112+
/*
113+
* assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
114+
* repl.interpret("val ver = sc.version");
115+
* assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n",
116+
* repl.interpret("println(\"HELLO\")").message());
117+
*/
118+
}
119+
120+
@Test
121+
public void testEndWithComment() {
122+
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
123+
}
124+
125+
@Test
126+
public void testSparkSql(){
127+
repl.interpret("case class Person(name:String, age:Int)\n", context);
128+
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
129+
assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
130+
131+
132+
if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default.
133+
// create new interpreter
134+
Properties p = new Properties();
135+
SparkInterpreter repl2 = new SparkInterpreter(p);
136+
repl2.open();
137+
138+
repl.interpret("case class Man(name:String, age:Int)", context);
139+
repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
140+
assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
141+
repl2.getSparkContext().stop();
142+
}
143+
}
144+
145+
@Test
146+
public void testReferencingUndefinedVal() {
147+
InterpreterResult result = repl.interpret("def category(min: Int) = {"
148+
+ " if (0 <= value) \"error\"" + "}", context);
149+
assertEquals(Code.ERROR, result.code());
150+
}
132151

133152
@Test
134153
public void testZContextDependencyLoading() {

0 commit comments

Comments
 (0)