Skip to content

Commit 715437a

Browse files
committed
Review comments and added a reset call in ReceiverTrackerTest.
1 parent e57c66b commit 715437a

File tree

6 files changed

+20
-15
lines changed

6 files changed

+20
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
5050
val id = ssc.getNewInputStreamId()
5151

5252
// Keep track of the freshest rate for this stream using the rateEstimator
53-
protected[streaming] val rateController: Option[RateController] =
54-
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
55-
new RateController(id, estimator) {
56-
override def publish(rate: Long): Unit = ()
57-
}
58-
}
53+
protected[streaming] val rateController: Option[RateController] = None
5954

6055
/** A human-readable name of this InputDStream */
6156
private[streaming] def name: String = {

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
4545
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
4646
*/
4747
override protected[streaming] val rateController: Option[RateController] =
48-
RateEstimator.makeEstimator(ssc.conf).map { new ReceiverRateController(id, _) }
48+
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
4949

5050
/**
5151
* Gets the receiver object that will be sent to the worker nodes

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6666
}
6767
eventLoop.start()
6868

69-
// Estimators receive updates from batch completion
69+
// attach rate controllers of input streams to receive batch completion updates
7070
for {
7171
inputDStream <- ssc.graph.getInputStreams
7272
rateController <- inputDStream.rateController

streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
2121

2222
import scala.concurrent.{ExecutionContext, Future}
2323

24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.annotation.DeveloperApi
2426
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2527
import org.apache.spark.util.ThreadUtils
2628

@@ -29,8 +31,8 @@ import org.apache.spark.util.ThreadUtils
2931
* an estimate of the speed at which this stream should ingest messages,
3032
* given an estimate computation from a `RateEstimator`
3133
*/
32-
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
33-
extends StreamingListener with Serializable {
34+
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
35+
extends StreamingListener with Serializable {
3436

3537
protected def publish(rate: Long): Unit
3638

@@ -46,8 +48,8 @@ private [streaming] abstract class RateController(val streamUID: Int, rateEstima
4648
*/
4749
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
4850
Future[Unit] {
49-
val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay)
50-
newSpeed foreach { s =>
51+
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
52+
newRate.foreach { s =>
5153
rateLimit.set(s.toLong)
5254
publish(getLatestRate())
5355
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ object RateEstimator {
5252
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
5353
* known estimators.
5454
*/
55-
def makeEstimator(conf: SparkConf): Option[RateEstimator] =
56-
conf.getOption("spark.streaming.RateEstimator") map { estimator =>
55+
def create(conf: SparkConf): Option[RateEstimator] =
56+
conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator =>
5757
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
5858
}
5959
}

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
9090

9191
ssc.addStreamingListener(ReceiverStartedWaiter)
9292
ssc.scheduler.listenerBus.start(ssc.sc)
93+
SingletonDummyReceiver.reset()
9394

9495
val newRateLimit = 100L
9596
val inputDStream = new RateLimitInputDStream(ssc)
@@ -109,7 +110,14 @@ class ReceiverTrackerSuite extends TestSuiteBase {
109110
}
110111
}
111112

112-
/** An input DStream with a hard-coded receiver that gives access to internals for testing. */
113+
/**
114+
* An input DStream with a hard-coded receiver that gives access to internals for testing.
115+
*
116+
* @note Make sure to call {{{SingletonDummyReceiver.reset()}}} before using this in a test,
117+
* or otherwise you may get {{{NotSerializableException}}} when trying to serialize
118+
* the receiver.
119+
* @see [[[SingletonDummyReceiver]]].
120+
*/
113121
private class RateLimitInputDStream(@transient ssc_ : StreamingContext)
114122
extends ReceiverInputDStream[Int](ssc_) {
115123

0 commit comments

Comments
 (0)