Skip to content

Commit 93060b6

Browse files
committed
ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created
1 parent c717daf commit 93060b6

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

spark/src/main/resources/python/zeppelin_pyspark.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,12 @@ def getCompletion(self, text_value):
219219
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
220220
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
221221
if sparkVersion.isSpark2():
222-
sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
222+
spark = SparkSession(sc, intp.getSparkSession())
223+
sqlc = spark._wrapped
223224
else:
224225
sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
225226
sqlContext = sqlc
226227

227-
if sparkVersion.isSpark2():
228-
spark = SparkSession(sc, intp.getSparkSession())
229-
230228
completion = PySparkCompletion(intp)
231229
z = PyZeppelinContext(intp.getZeppelinContext())
232230

zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,18 @@ public void pySparkTest() throws IOException {
220220
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
221221
// TODO (zjffdu), one more \n is appended, need to investigate why.
222222
assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
223+
224+
// test udf
225+
p = note.addParagraph();
226+
config = p.getConfig();
227+
config.put("enabled", true);
228+
p.setConfig(config);
229+
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
230+
"sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
231+
note.run(p.getId());
232+
waitForFinish(p);
233+
assertEquals(Status.FINISHED, p.getStatus());
234+
assertEquals("[Row(len=u'3')]\n", p.getResult().message());
223235
}
224236
if (sparkVersion >= 20) {
225237
// run SparkSession test
@@ -234,6 +246,19 @@ public void pySparkTest() throws IOException {
234246
waitForFinish(p);
235247
assertEquals(Status.FINISHED, p.getStatus());
236248
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
249+
250+
// test udf
251+
p = note.addParagraph();
252+
config = p.getConfig();
253+
config.put("enabled", true);
254+
p.setConfig(config);
255+
// use SQLContext to register UDF but use this UDF through SparkSession
256+
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
257+
"spark.sql(\"select f1(\\\"abc\\\")\").collect()");
258+
note.run(p.getId());
259+
waitForFinish(p);
260+
assertEquals(Status.FINISHED, p.getStatus());
261+
assertEquals("[Row(f1(abc)=u'3')]\n", p.getResult().message());
237262
}
238263
}
239264
ZeppelinServer.notebook.removeNote(note.getId(), null);

0 commit comments

Comments
 (0)