Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sudo: false
cache:
directories:
- .spark-dist
- ${HOME}/.m2/repository/.cache/maven-download-plugin

addons:
apt:
Expand All @@ -34,11 +35,11 @@ matrix:
include:
# Test all modules with scala 2.10
- jdk: "oraclejdk7"
env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="-fae package -Dscala-2.10 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Dscala-2.10" BUILD_FLAG="-fae package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""

# Test all modules with scala 2.11
- jdk: "oraclejdk7"
env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="-fae package -Dscala-2.11 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Dscala-2.11" BUILD_FLAG="-fae package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""

# Test spark module for 1.5.2
- jdk: "oraclejdk7"
Expand Down
54 changes: 10 additions & 44 deletions flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import scala.Console;
import scala.None;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
import scala.runtime.AbstractFunction0;
import scala.tools.nsc.Settings;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void open() {

// prepare bindings
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
binder = (Map<String, Object>) getValue("_binder");
Map<String, Object> binder = (Map<String, Object>) getLastObject();

// import libraries
imain.interpret("import scala.tools.nsc.io._");
Expand All @@ -115,39 +116,9 @@ public void open() {
imain.interpret("import org.apache.flink.api.scala._");
imain.interpret("import org.apache.flink.api.common.functions._");

String scalaVersion = scalaVersion(imain);
// scala 2.10 use imain.bindValue("env" env)
// scala 2.11 use imain.put("env", env);
String bindMethod = "bindValue";
if (scalaVersion.equals("2.11")) {
bindMethod = "put";
}

java.lang.reflect.Method method;
try {
method = imain.getClass().getMethod(bindMethod, String.class, Object.class);
if (method != null) {
method.invoke(imain, "env", env);
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error binding environment variable: " + e.getMessage(), e);
}
}
}

private String scalaVersion(IMain imain) {
String version = "2.10";

try {
if (imain.getClass().getMethod("put", String.class, Object.class) != null) {
version = "2.11";
}
} catch (Exception e) {
// ignore
}

return version;
binder.put("env", env);
imain.interpret("val env = _binder.get(\"env\").asInstanceOf["
+ env.getClass().getName() + "]");
}

private boolean localMode() {
Expand Down Expand Up @@ -236,16 +207,11 @@ private List<File> classPath(ClassLoader cl) {
return paths;
}

public Object getValue(String name) {
IMain imain = flinkIloop.intp();
Object ret = imain.valueOfTerm(name);
if (ret instanceof None) {
return null;
} else if (ret instanceof Some) {
return ((Some) ret).get();
} else {
return ret;
}
public Object getLastObject() {
Object obj = imain.lastRequest().lineRep().call(
"$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
return obj;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import scala.Console;
import scala.None;
import scala.Some;
import scala.collection.JavaConversions;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.IMain;
import scala.tools.nsc.interpreter.Results.Result;
Expand Down Expand Up @@ -172,16 +173,11 @@ private List<File> classPath(ClassLoader cl) {
return paths;
}

public Object getValue(String name) {
Object val = imain.valueOfTerm(name);

if (val instanceof None) {
return null;
} else if (val instanceof Some) {
return ((Some) val).get();
} else {
return val;
}
public Object getLastObject() {
Object obj = imain.lastRequest().lineRep().call(
"$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
return obj;
}

private Ignite getIgnite() {
Expand Down Expand Up @@ -220,7 +216,7 @@ private Ignite getIgnite() {

private void initIgnite() {
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
Map<String, Object> binder = (Map<String, Object>) getLastObject();

if (getIgnite() != null) {
binder.put("ignite", ignite);
Expand Down
2 changes: 1 addition & 1 deletion spark-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

Expand Down
62 changes: 0 additions & 62 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-spark-dependencies</artifactId>
Expand Down Expand Up @@ -405,38 +399,6 @@
</executions>
</plugin>

<!-- Include a source dir depending on the Scala version -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.source.dir}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.testsource.dir}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- Plugin to compile Scala code -->
<plugin>
<groupId>org.scala-tools</groupId>
Expand Down Expand Up @@ -470,30 +432,6 @@
</build>

<profiles>
<profile>
<id>scala-2.10</id>
<activation>
<property><name>!scala-2.11</name></property>
</activation>
<properties>
<spark.version>1.6.1</spark.version>
<extra.source.dir>src/main/scala-2.10</extra.source.dir>
<extra.testsource.dir>src/test/scala-2.10</extra.testsource.dir>
</properties>
</profile>

<profile>
<id>scala-2.11</id>
<activation>
<property><name>scala-2.11</name></property>
</activation>
<properties>
<spark.version>1.6.1</spark.version>
<extra.source.dir>src/main/scala-2.11</extra.source.dir>
<extra.testsource.dir>src/test/scala/scala-2.11</extra.testsource.dir>
</properties>
</profile>

<!-- to deactivate 'exclude-sparkr' automatically when 'spark' is activated -->
<profile>
<id>sparkr</id>
Expand Down
Loading