Skip to content

Commit ff0c00d

Browse files
committed
Use LinkedIn spark to build spark runner
1 parent 95f0bd9 commit ff0c00d

2 files changed

Lines changed: 34 additions & 15 deletions

File tree

runners/spark/build.gradle

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@ evaluationDependsOn(":sdks:java:core")
3232
evaluationDependsOn(":sdks:java:io:hadoop-format")
3333
evaluationDependsOn(":runners:core-java")
3434

35+
repositories {
36+
maven {
37+
url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
38+
}
39+
maven {
40+
url "https://artifactory.corp.linkedin.com:8083/artifactory/mintdev-publish-repo/"
41+
}
42+
maven {
43+
url "file://" + System.getProperty("user.home") + "/local-repo"
44+
}
45+
}
46+
3547
configurations {
3648
validatesRunner
3749
}
@@ -55,6 +67,9 @@ test {
5567
}
5668
}
5769

70+
// Use LinkedIn spark
71+
def spark_version = "2.3.0.263"
72+
5873
dependencies {
5974
compile project(path: ":model:pipeline", configuration: "shadow")
6075
compile project(path: ":sdks:java:core", configuration: "shadow")
@@ -65,10 +80,12 @@ dependencies {
6580
compile library.java.slf4j_api
6681
compile library.java.joda_time
6782
compile library.java.args4j
68-
provided library.java.spark_core
69-
provided library.java.spark_sql
70-
provided library.java.spark_streaming
71-
provided library.java.spark_network_common
83+
84+
compile "com.linkedin.spark:spark-core_2.11:$spark_version"
85+
compile "com.linkedin.spark:spark-sql_2.11:$spark_version"
86+
compile "com.linkedin.spark:spark-streaming_2.11:$spark_version"
87+
compile "com.linkedin.spark:spark-network-common_2.11:$spark_version"
88+
7289
provided library.java.hadoop_common
7390
provided library.java.commons_io
7491
provided library.java.hamcrest_core
@@ -235,7 +252,7 @@ task validatesRunner {
235252
description "Validates Spark runner"
236253
dependsOn validatesRunnerBatch
237254
dependsOn validatesRunnerStreaming
238-
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
255+
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
239256
// Otherwise, it breaks Spark runner ValidatesRunner tests.
240257
//dependsOn validatesStructuredStreamingRunnerBatch
241258
}

runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.apache.spark.sql.catalyst.expressions.Expression;
3737
import org.apache.spark.sql.catalyst.expressions.NonSQLExpression;
3838
import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
39-
import org.apache.spark.sql.catalyst.expressions.codegen.Block;
40-
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator;
4139
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext;
4240
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode;
4341
import org.apache.spark.sql.types.DataType;
@@ -94,7 +92,7 @@ public Expression child() {
9492
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
9593
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
9694
ExprCode input = child.genCode(ctx);
97-
String javaType = CodeGenerator.javaType(dataType());
95+
String javaType = ctx.javaType(dataType());
9896

9997
List<String> parts = new ArrayList<>();
10098
List<Object> args = new ArrayList<>();
@@ -117,10 +115,12 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
117115

118116
StringContext sc =
119117
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
120-
Block code =
121-
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
118+
// Block code =
119+
// (new
120+
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
121+
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
122122

123-
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
123+
return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
124124
}
125125

126126
@Override
@@ -203,7 +203,7 @@ public Expression child() {
203203
public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
204204
String accessCode = ctx.addReferenceObj("coder", coder, coder.getClass().getName());
205205
ExprCode input = child.genCode(ctx);
206-
String javaType = CodeGenerator.javaType(dataType());
206+
String javaType = ctx.javaType(dataType());
207207

208208
List<String> parts = new ArrayList<>();
209209
List<Object> args = new ArrayList<>();
@@ -228,9 +228,11 @@ public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) {
228228

229229
StringContext sc =
230230
new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
231-
Block code =
232-
(new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
233-
return ev.copy(input.code().$plus(code), input.isNull(), ev.value());
231+
// Block code =
232+
// (new
233+
// Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq());
234+
String code = sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
235+
return ev.copy(input.code() + "\n" + code, input.isNull(), ev.value());
234236
}
235237

236238
@Override

0 commit comments

Comments
 (0)