@@ -19,42 +19,28 @@ package org.apache.spark.streaming.api.python
1919
2020import java .util .{List => JList , ArrayList => JArrayList , Map => JMap , Collections }
2121
22- import org . apache . spark . api . java .{ JavaSparkContext , JavaPairRDD , JavaRDD }
23- import org . apache . spark . broadcast . Broadcast
22+ import scala . reflect . ClassTag
23+
2424import org .apache .spark ._
25- import org .apache .spark .util .Utils
26- import java .io ._
27- import scala .Some
28- import org .apache .spark .streaming .Duration
29- import scala .util .control .Breaks ._
30- import org .apache .spark .broadcast .Broadcast
31- import scala .Some
32- import org .apache .spark .streaming .Duration
3325import org .apache .spark .rdd .RDD
34- import org .apache .spark .api .python .PythonRDD
35-
36-
26+ import org .apache .spark .api .python ._
27+ import org .apache .spark .broadcast .Broadcast
3728import org .apache .spark .streaming .{Duration , Time }
3829import org .apache .spark .streaming .dstream ._
3930import org .apache .spark .streaming .api .java ._
40- import org .apache .spark .rdd .RDD
41- import org .apache .spark .api .python ._
42- import org .apache .spark .api .python .PairwiseRDD
43-
4431
45- import scala .reflect .ClassTag
4632
4733
4834class PythonDStream [T : ClassTag ](
49- parent : DStream [T ],
50- command : Array [Byte ],
51- envVars : JMap [String , String ],
52- pythonIncludes : JList [String ],
53- preservePartitoning : Boolean ,
54- pythonExec : String ,
55- broadcastVars : JList [Broadcast [Array [Byte ]]],
56- accumulator : Accumulator [JList [Array [Byte ]]]
57- ) extends DStream [Array [Byte ]](parent.ssc) {
35+ parent : DStream [T ],
36+ command : Array [Byte ],
37+ envVars : JMap [String , String ],
38+ pythonIncludes : JList [String ],
39+ preservePartitoning : Boolean ,
40+ pythonExec : String ,
41+ broadcastVars : JList [Broadcast [Array [Byte ]]],
42+ accumulator : Accumulator [JList [Array [Byte ]]])
43+ extends DStream [Array [Byte ]](parent.ssc) {
5844
5945 override def dependencies = List (parent)
6046
@@ -70,84 +56,4 @@ class PythonDStream[T: ClassTag](
7056 }
7157 }
7258 val asJavaDStream = JavaDStream .fromDStream(this )
73-
74- /**
75- * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
76- * operator, so this PythonDStream will be registered as an output stream and there materialized.
77- * Since serialized Python object is readable by Python, pyprint writes out binary data to
78- * temporary file and run python script to deserialized and print the first ten elements
79- */
80- private [streaming] def ppyprint () {
81- def foreachFunc = (rdd : RDD [Array [Byte ]], time : Time ) => {
82- val iter = rdd.take(11 ).iterator
83-
84- // make a temporary file
85- val prefix = " spark"
86- val suffix = " .tmp"
87- val tempFile = File .createTempFile(prefix, suffix)
88- val tempFileStream = new DataOutputStream (new FileOutputStream (tempFile.getAbsolutePath))
89- // write out serialized python object
90- PythonRDD .writeIteratorToStream(iter, tempFileStream)
91- tempFileStream.close()
92-
93- // This value has to be passed from python
94- // val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
95- val sparkHome = new ProcessBuilder ().environment().get(" SPARK_HOME" )
96- // val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
97- // absolute path to the python script is needed to change because we do not use pysparkstreaming
98- val pb = new ProcessBuilder (pythonExec, sparkHome + " /python/pysparkstreaming/streaming/pyprint.py" , tempFile.getAbsolutePath)
99- val workerEnv = pb.environment()
100-
101- // envVars also need to be pass
102- // workerEnv.putAll(envVars)
103- val pythonPath = sparkHome + " /python/" + File .pathSeparator + workerEnv.get(" PYTHONPATH" )
104- workerEnv.put(" PYTHONPATH" , pythonPath)
105- val worker = pb.start()
106- val is = worker.getInputStream()
107- val isr = new InputStreamReader (is)
108- val br = new BufferedReader (isr)
109-
110- println (" -------------------------------------------" )
111- println (" Time: " + time)
112- println (" -------------------------------------------" )
113-
114- // print value from python std out
115- var line = " "
116- breakable {
117- while (true ) {
118- line = br.readLine()
119- if (line == null ) break()
120- println(line)
121- }
122- }
123- // delete temporary file
124- tempFile.delete()
125- println()
126-
127- }
128- new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
129- }
13059}
131-
132-
133- private class PairwiseDStream (prev: DStream [Array [Byte ]]) extends
134- DStream [(Long , Array [Byte ])](prev.ssc){
135- override def dependencies = List (prev)
136-
137- override def slideDuration : Duration = prev.slideDuration
138-
139- override def compute (validTime: Time ): Option [RDD [(Long , Array [Byte ])]]= {
140- prev.getOrCompute(validTime) match {
141- case Some (rdd)=> Some (rdd)
142- val pairwiseRDD = new PairwiseRDD (rdd)
143- Some (pairwiseRDD.asJavaPairRDD.rdd)
144- case None => None
145- }
146- }
147- val asJavaPairDStream : JavaPairDStream [Long , Array [Byte ]] = JavaPairDStream .fromJavaDStream(this )
148- }
149-
150-
151-
152-
153-
0 commit comments