1717
1818package org .apache .zeppelin .scio
1919
20+ import java .beans .Introspector
2021import java .io .PrintStream
2122import java .util
2223import java .util .Properties
2324
25+ import com .google .cloud .dataflow .sdk .options .{PipelineOptions , PipelineOptionsFactory }
2426import com .google .cloud .dataflow .sdk .runners .inprocess .InProcessPipelineRunner
2527import com .spotify .scio .repl .{ScioILoop , ScioReplClassLoader }
2628import org .apache .zeppelin .interpreter .Interpreter .FormType
@@ -41,7 +43,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
4143 val innerOut = new InterpreterOutputStream (logger)
4244
4345 override def open (): Unit = {
44- val args : List [String ] = Option (getProperty(" args " ))
46+ val args : List [String ] = Option (getProperty(" argz " ))
4547 .getOrElse(s " --runner= ${classOf [InProcessPipelineRunner ].getSimpleName}" )
4648 .split(" " )
4749 .map(_.trim)
@@ -90,7 +92,9 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
9092 null ,
9193 Thread .currentThread.getContextClassLoader)
9294
93- REPL = new ScioILoop (scioClassLoader, args, None , new JPrintWriter (innerOut))
95+ val (dfArgs, _) = parseAndPartitionArgs(args)
96+
97+ REPL = new ScioILoop (scioClassLoader, dfArgs, None , new JPrintWriter (innerOut))
9498 scioClassLoader.setRepl(REPL )
9599
96100 // Set classloader chain - expose top level abstract class loader down
@@ -103,6 +107,27 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
103107
104108 REPL .settings_=(settings)
105109 REPL .createInterpreter()
110+ REPL .interpret(s """ val argz = Array(" ${args.mkString(" \" , \" " )}") """ )
111+ }
112+
113+ private def parseAndPartitionArgs (args : List [String ]): (List [String ], List [String ]) = {
114+ import scala .collection .JavaConverters ._
115+ // Extract --pattern of all registered derived types of PipelineOptions
116+ val classes = PipelineOptionsFactory .getRegisteredOptions.asScala + classOf [PipelineOptions ]
117+ val optPatterns = classes.flatMap { cls =>
118+ cls.getMethods.flatMap { m =>
119+ val n = m.getName
120+ if ((! n.startsWith(" get" ) && ! n.startsWith(" is" )) ||
121+ m.getParameterTypes.nonEmpty || m.getReturnType == classOf [Unit ]) {
122+ None
123+ } else {
124+ Some (Introspector .decapitalize(n.substring(if (n.startsWith(" is" )) 2 else 3 )))
125+ }
126+ }.map(s => s " -- $s( $$ |=) " .r)
127+ }
128+
129+ // Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args
130+ args.partition(arg => optPatterns.exists(_.findFirstIn(arg).isDefined))
106131 }
107132
108133 override def close (): Unit = {
0 commit comments