Skip to content

Commit bd8a4c2

Browse files
committed
fix scala style
1 parent 7797c70 commit bd8a4c2

File tree

1 file changed

+25
-22
lines changed

1 file changed

+25
-22
lines changed

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
4747
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
4848

4949
def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
50-
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd)
50+
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava))
51+
.map(_.rdd)
5152
}
5253

5354
def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
@@ -133,8 +134,9 @@ private[python] object PythonDStream {
133134
/**
134135
* Base class for PythonDStream with some common methods
135136
*/
136-
private[python]
137-
abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransformFunction)
137+
private[python] abstract class PythonDStream(
138+
parent: DStream[_],
139+
@transient pfunc: PythonTransformFunction)
138140
extends DStream[Array[Byte]] (parent.ssc) {
139141

140142
val func = new TransformFunction(pfunc)
@@ -152,9 +154,10 @@ abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransfo
152154
* If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it
153155
* as an template for future use, this can reduce the Python callbacks.
154156
*/
155-
private[python]
156-
class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTransformFunction,
157-
var reuse: Boolean = false)
157+
private[python] class PythonTransformedDStream (
158+
parent: DStream[_],
159+
@transient pfunc: PythonTransformFunction,
160+
var reuse: Boolean = false)
158161
extends PythonDStream(parent, pfunc) {
159162

160163
// rdd returned by func
@@ -191,9 +194,10 @@ class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTran
191194
/**
192195
* Transformed from two DStreams in Python.
193196
*/
194-
private[python]
195-
class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
196-
@transient pfunc: PythonTransformFunction)
197+
private[python] class PythonTransformed2DStream(
198+
parent: DStream[_],
199+
parent2: DStream[_],
200+
@transient pfunc: PythonTransformFunction)
197201
extends DStream[Array[Byte]] (parent.ssc) {
198202

199203
val func = new TransformFunction(pfunc)
@@ -212,8 +216,9 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
212216
/**
213217
* similar to StateDStream
214218
*/
215-
private[python]
216-
class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonTransformFunction)
219+
private[python] class PythonStateDStream(
220+
parent: DStream[Array[Byte]],
221+
@transient reduceFunc: PythonTransformFunction)
217222
extends PythonDStream(parent, reduceFunc) {
218223

219224
super.persist(StorageLevel.MEMORY_ONLY)
@@ -233,13 +238,13 @@ class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: Py
233238
/**
234239
* similar to ReducedWindowedDStream
235240
*/
236-
private[python]
237-
class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
238-
@transient preduceFunc: PythonTransformFunction,
239-
@transient pinvReduceFunc: PythonTransformFunction,
240-
_windowDuration: Duration,
241-
_slideDuration: Duration
242-
) extends PythonDStream(parent, preduceFunc) {
241+
private[python] class PythonReducedWindowedDStream(
242+
parent: DStream[Array[Byte]],
243+
@transient preduceFunc: PythonTransformFunction,
244+
@transient pinvReduceFunc: PythonTransformFunction,
245+
_windowDuration: Duration,
246+
_slideDuration: Duration)
247+
extends PythonDStream(parent, preduceFunc) {
243248

244249
super.persist(StorageLevel.MEMORY_ONLY)
245250
override val mustCheckpoint = true
@@ -252,8 +257,7 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
252257

253258
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
254259
val currentTime = validTime
255-
val current = new Interval(currentTime - windowDuration,
256-
currentTime)
260+
val current = new Interval(currentTime - windowDuration, currentTime)
257261
val previous = current - slideDuration
258262

259263
// _____________________________
@@ -266,11 +270,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
266270
// V V
267271
// old RDDs new RDDs
268272
//
269-
270273
val previousRDD = getOrCompute(previous.endTime)
271274

275+
// for small window, reduce once will be better than twice
272276
if (pinvReduceFunc != null && previousRDD.isDefined
273-
// for small window, reduce once will be better than twice
274277
&& windowDuration >= slideDuration * 5) {
275278

276279
// subtract the values from old RDDs

0 commit comments

Comments
 (0)