Skip to content

Commit 6ff4e95

Browse files
author
Rafal Wojdyla
committed
Inject argz to the Scio interpreter
1 parent 570cfaa commit 6ff4e95

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

scio/src/main/resources/interpreter-setting.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
"className": "org.apache.zeppelin.scio.ScioInterpreter",
66
"defaultInterpreter": true,
77
"properties": {
8-
"args": {
8+
"argz": {
99
"envName": null,
1010
"propertyName": null,
1111
"defaultValue": "--runner=InProcessPipelineRunner",
12-
"description": "Scio commandline args"
12+
"description": "Scio interpreter wide arguments"
1313
}
1414
},
1515
"editor": {

scio/src/main/scala/org/apache/zeppelin/scio/ScioInterpreter.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.zeppelin.scio
1919

20+
import java.beans.Introspector
2021
import java.io.PrintStream
2122
import java.util
2223
import java.util.Properties
2324

25+
import com.google.cloud.dataflow.sdk.options.{PipelineOptions, PipelineOptionsFactory}
2426
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
2527
import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader}
2628
import org.apache.zeppelin.interpreter.Interpreter.FormType
@@ -41,7 +43,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
4143
val innerOut = new InterpreterOutputStream(logger)
4244

4345
override def open(): Unit = {
44-
val args: List[String] = Option(getProperty("args"))
46+
val args: List[String] = Option(getProperty("argz"))
4547
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
4648
.split(" ")
4749
.map(_.trim)
@@ -90,7 +92,9 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
9092
null,
9193
Thread.currentThread.getContextClassLoader)
9294

93-
REPL = new ScioILoop(scioClassLoader, args, None, new JPrintWriter(innerOut))
95+
val (dfArgs, _) = parseAndPartitionArgs(args)
96+
97+
REPL = new ScioILoop(scioClassLoader, dfArgs, None, new JPrintWriter(innerOut))
9498
scioClassLoader.setRepl(REPL)
9599

96100
// Set classloader chain - expose top level abstract class loader down
@@ -103,6 +107,27 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
103107

104108
REPL.settings_=(settings)
105109
REPL.createInterpreter()
110+
REPL.interpret(s"""val argz = Array("${args.mkString("\", \"")}")""")
111+
}
112+
113+
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {
114+
import scala.collection.JavaConverters._
115+
// Extract --pattern of all registered derived types of PipelineOptions
116+
val classes = PipelineOptionsFactory.getRegisteredOptions.asScala + classOf[PipelineOptions]
117+
val optPatterns = classes.flatMap { cls =>
118+
cls.getMethods.flatMap { m =>
119+
val n = m.getName
120+
if ((!n.startsWith("get") && !n.startsWith("is")) ||
121+
m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) {
122+
None
123+
} else {
124+
Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3)))
125+
}
126+
}.map(s => s"--$s($$|=)".r)
127+
}
128+
129+
// Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args
130+
args.partition(arg => optPatterns.exists(_.findFirstIn(arg).isDefined))
106131
}
107132

108133
override def close(): Unit = {

0 commit comments

Comments
 (0)