@@ -34,65 +34,70 @@ class RateControllerSuite extends TestSuiteBase {
3434
3535 test(" rate controller publishes updates" ) {
3636 val ssc = new StreamingContext (conf, batchDuration)
37- val dstream = new MockRateLimitDStream (ssc, Seq (Seq (1 )), 1 )
38- val output = new TestOutputStreamWithPartitions (dstream)
39- output.register()
40- runStreams(ssc, 1 , 1 )
41-
42- eventually(timeout(2 .seconds)) {
43- assert(dstream.publishCalls === 1 )
37+ withStreamingContext(ssc) { ssc =>
38+ val dstream = new MockRateLimitDStream (ssc, Seq (Seq (1 )), 1 )
39+ val output = new TestOutputStreamWithPartitions (dstream)
40+ output.register()
41+ runStreams(ssc, 1 , 1 )
42+
43+ eventually(timeout(2 .seconds)) {
44+ assert(dstream.publishCalls === 1 )
45+ }
4446 }
4547 }
4648
4749 test(" receiver rate controller updates reach receivers" ) {
4850 val ssc = new StreamingContext (conf, batchDuration)
51+ withStreamingContext(ssc) { ssc =>
52+ val dstream = new RateLimitInputDStream (ssc) {
53+ override val rateController =
54+ Some (new ReceiverRateController (id, new ConstantEstimator (200.0 )))
55+ }
56+ SingletonDummyReceiver .reset()
4957
50- val dstream = new RateLimitInputDStream (ssc) {
51- override val rateController =
52- Some (new ReceiverRateController (id, new ConstantEstimator (200.0 )))
53- }
54- SingletonDummyReceiver .reset()
55-
56- val output = new TestOutputStreamWithPartitions (dstream)
57- output.register()
58- runStreams(ssc, 2 , 2 )
58+ val output = new TestOutputStreamWithPartitions (dstream)
59+ output.register()
60+ runStreams(ssc, 2 , 2 )
5961
60- eventually(timeout(5 .seconds)) {
61- assert(dstream.getCurrentRateLimit === Some (200 ))
62+ eventually(timeout(5 .seconds)) {
63+ assert(dstream.getCurrentRateLimit === Some (200 ))
64+ }
6265 }
6366 }
6467
6568 test(" multiple rate controller updates reach receivers" ) {
6669 val ssc = new StreamingContext (conf, batchDuration)
67- val rates = Seq (100L , 200L , 300L )
70+ withStreamingContext(ssc) { ssc =>
71+ val rates = Seq (100L , 200L , 300L )
6872
69- val dstream = new RateLimitInputDStream (ssc) {
70- override val rateController =
71- Some (new ReceiverRateController (id, new ConstantEstimator (rates.map(_.toDouble): _* )))
72- }
73- SingletonDummyReceiver .reset()
74-
75- val output = new TestOutputStreamWithPartitions (dstream)
76- output.register()
77-
78- val observedRates = mutable.HashSet .empty[Long ]
79-
80- @ volatile var done = false
81- runInBackground {
82- while (! done) {
83- try {
84- dstream.getCurrentRateLimit.foreach(observedRates += _)
85- } catch {
86- case NonFatal (_) => () // don't stop if the executor wasn't installed yet
73+ val dstream = new RateLimitInputDStream (ssc) {
74+ override val rateController =
75+ Some (new ReceiverRateController (id, new ConstantEstimator (rates.map(_.toDouble): _* )))
76+ }
77+ SingletonDummyReceiver .reset()
78+
79+ val output = new TestOutputStreamWithPartitions (dstream)
80+ output.register()
81+
82+ val observedRates = mutable.HashSet .empty[Long ]
83+
84+ @ volatile var done = false
85+ runInBackground {
86+ while (! done) {
87+ try {
88+ dstream.getCurrentRateLimit.foreach(observedRates += _)
89+ } catch {
90+ case NonFatal (_) => () // don't stop if the executor wasn't installed yet
91+ }
92+ Thread .sleep(20 )
8793 }
88- Thread .sleep(20 )
8994 }
90- }
91- runStreams(ssc, 4 , 4 )
92- done = true
95+ runStreams(ssc, 4 , 4 )
96+ done = true
9397
94- // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
95- observedRates should contain theSameElementsAs (rates :+ Long .MaxValue )
98+ // Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
99+ observedRates should contain theSameElementsAs (rates :+ Long .MaxValue )
100+ }
96101 }
97102
98103 private def runInBackground (f : => Unit ): Unit = {
0 commit comments