@@ -47,7 +47,8 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
4747 extends function.Function2 [JList [JavaRDD [_]], Time , JavaRDD [Array [Byte ]]] with Serializable {
4848
4949 def apply (rdd : Option [RDD [_]], time : Time ): Option [RDD [Array [Byte ]]] = {
50- Option (pfunc.call(time.milliseconds, List (rdd.map(JavaRDD .fromRDD(_)).orNull).asJava)).map(_.rdd)
50+ Option (pfunc.call(time.milliseconds, List (rdd.map(JavaRDD .fromRDD(_)).orNull).asJava))
51+ .map(_.rdd)
5152 }
5253
5354 def apply (rdd : Option [RDD [_]], rdd2 : Option [RDD [_]], time : Time ): Option [RDD [Array [Byte ]]] = {
@@ -133,8 +134,9 @@ private[python] object PythonDStream {
133134/**
134135 * Base class for PythonDStream with some common methods
135136 */
136- private [python]
137- abstract class PythonDStream (parent : DStream [_], @ transient pfunc : PythonTransformFunction )
137+ private [python] abstract class PythonDStream (
138+ parent : DStream [_],
139+ @ transient pfunc : PythonTransformFunction )
138140 extends DStream [Array [Byte ]] (parent.ssc) {
139141
140142 val func = new TransformFunction (pfunc)
@@ -152,9 +154,10 @@ abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransfo
152154 * If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it
153155 * as an template for future use, this can reduce the Python callbacks.
154156 */
155- private [python]
156- class PythonTransformedDStream (parent : DStream [_], @ transient pfunc : PythonTransformFunction ,
157- var reuse : Boolean = false )
157+ private [python] class PythonTransformedDStream (
158+ parent : DStream [_],
159+ @ transient pfunc : PythonTransformFunction ,
160+ var reuse : Boolean = false )
158161 extends PythonDStream (parent, pfunc) {
159162
160163 // rdd returned by func
@@ -191,9 +194,10 @@ class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTran
191194/**
192195 * Transformed from two DStreams in Python.
193196 */
194- private [python]
195- class PythonTransformed2DStream (parent : DStream [_], parent2 : DStream [_],
196- @ transient pfunc : PythonTransformFunction )
197+ private [python] class PythonTransformed2DStream (
198+ parent : DStream [_],
199+ parent2 : DStream [_],
200+ @ transient pfunc : PythonTransformFunction )
197201 extends DStream [Array [Byte ]] (parent.ssc) {
198202
199203 val func = new TransformFunction (pfunc)
@@ -212,8 +216,9 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
212216/**
213217 * similar to StateDStream
214218 */
215- private [python]
216- class PythonStateDStream (parent : DStream [Array [Byte ]], @ transient reduceFunc : PythonTransformFunction )
219+ private [python] class PythonStateDStream (
220+ parent : DStream [Array [Byte ]],
221+ @ transient reduceFunc : PythonTransformFunction )
217222 extends PythonDStream (parent, reduceFunc) {
218223
219224 super .persist(StorageLevel .MEMORY_ONLY )
@@ -233,13 +238,13 @@ class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: Py
233238/**
234239 * similar to ReducedWindowedDStream
235240 */
236- private [python]
237- class PythonReducedWindowedDStream ( parent : DStream [Array [Byte ]],
238- @ transient preduceFunc : PythonTransformFunction ,
239- @ transient pinvReduceFunc : PythonTransformFunction ,
240- _windowDuration : Duration ,
241- _slideDuration : Duration
242- ) extends PythonDStream (parent, preduceFunc) {
241+ private [python] class PythonReducedWindowedDStream (
242+ parent : DStream [Array [Byte ]],
243+ @ transient preduceFunc : PythonTransformFunction ,
244+ @ transient pinvReduceFunc : PythonTransformFunction ,
245+ _windowDuration : Duration ,
246+ _slideDuration : Duration )
247+ extends PythonDStream (parent, preduceFunc) {
243248
244249 super .persist(StorageLevel .MEMORY_ONLY )
245250 override val mustCheckpoint = true
@@ -252,8 +257,7 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
252257
253258 override def compute (validTime : Time ): Option [RDD [Array [Byte ]]] = {
254259 val currentTime = validTime
255- val current = new Interval (currentTime - windowDuration,
256- currentTime)
260+ val current = new Interval (currentTime - windowDuration, currentTime)
257261 val previous = current - slideDuration
258262
259263 // _____________________________
@@ -266,11 +270,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
266270 // V V
267271 // old RDDs new RDDs
268272 //
269-
270273 val previousRDD = getOrCompute(previous.endTime)
271274
275+ // for small window, reduce once will be better than twice
272276 if (pinvReduceFunc != null && previousRDD.isDefined
273- // for small window, reduce once will be better than twice
274277 && windowDuration >= slideDuration * 5 ) {
275278
276279 // subtract the values from old RDDs
0 commit comments