Skip to content

Commit 28dd7bf

Browse files
committed
Make unittest pass with spark-1.3 profile
1 parent cf2ed0c commit 28dd7bf

File tree

2 files changed

+40
-13
lines changed

2 files changed

+40
-13
lines changed

spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,17 @@ public void testSparkSql(){
111111
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
112112
assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
113113

114-
// create new interpreter
115-
Properties p = new Properties();
116-
SparkInterpreter repl2 = new SparkInterpreter(p);
117-
repl2.open();
118-
119-
repl.interpret("case class Man(name:String, age:Int)", context);
120-
repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
121-
assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
122-
repl2.getSparkContext().stop();
114+
if (repl.getSparkContext().version().startsWith("1.1")) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default.
115+
// create new interpreter
116+
Properties p = new Properties();
117+
SparkInterpreter repl2 = new SparkInterpreter(p);
118+
repl2.open();
119+
120+
repl.interpret("case class Man(name:String, age:Int)", context);
121+
repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
122+
assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
123+
repl2.getSparkContext().stop();
124+
}
123125
}
124126

125127
@Test

spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,40 +72,65 @@ public void setUp() throws Exception {
7272
public void tearDown() throws Exception {
7373
}
7474

75+
boolean isDataFrameSupported() {
76+
String version = repl.getSparkContext().version();
77+
if (version.startsWith("1.1") || version.startsWith("1.2")) {
78+
return false;
79+
} else {
80+
return true;
81+
}
82+
}
83+
7584
@Test
7685
public void test() {
7786
repl.interpret("case class Test(name:String, age:Int)", context);
7887
repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
79-
repl.interpret("test.registerAsTable(\"test\")", context);
88+
if (isDataFrameSupported()) {
89+
repl.interpret("test.toDF.registerTempTable(\"test\")", context);
90+
} else {
91+
repl.interpret("test.registerTempTable(\"test\")", context);
92+
}
8093

8194
InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
8295
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
8396
assertEquals(Type.TABLE, ret.type());
8497
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
8598

8699
assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
87-
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code());
100+
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
88101
}
89102

90103
@Test
91104
public void testStruct(){
92105
repl.interpret("case class Person(name:String, age:Int)", context);
93106
repl.interpret("case class People(group:String, person:Person)", context);
94107
repl.interpret("val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", context);
95-
repl.interpret("gr.registerAsTable(\"gr\")", context);
108+
if (isDataFrameSupported()) {
109+
repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
110+
} else {
111+
repl.interpret("gr.registerTempTable(\"gr\")", context);
112+
}
113+
96114
InterpreterResult ret = sql.interpret("select * from gr", context);
97115
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
98116
}
99117

100118
@Test
101119
public void test_null_value_in_row() {
102120
repl.interpret("import org.apache.spark.sql._", context);
121+
if (isDataFrameSupported()) {
122+
repl.interpret("import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}", context);
123+
}
103124
repl.interpret("def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", context);
104125
repl.interpret("val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context);
105126
repl.interpret("val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context);
106127
repl.interpret("val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context);
107128
repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", context);
108-
repl.interpret("people.registerTempTable(\"people\")", context);
129+
if (isDataFrameSupported()) {
130+
repl.interpret("people.toDF.registerTempTable(\"people\")", context);
131+
} else {
132+
repl.interpret("people.registerTempTable(\"people\")", context);
133+
}
109134

110135
InterpreterResult ret = sql.interpret("select name, age from people where name = 'gates'", context);
111136
System.err.println("RET=" + ret.message());

0 commit comments

Comments
 (0)