File tree Expand file tree Collapse file tree 1 file changed +3
-1
lines changed
streaming/src/main/scala/org/apache/spark/streaming/scheduler Expand file tree Collapse file tree 1 file changed +3
-1
lines changed Original file line number Diff line number Diff line change @@ -200,6 +200,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
200200 job.setEndTime(completedTime)
201201 listenerBus.post(StreamingListenerOutputOperationCompleted (job.toOutputOperationInfo))
202202 logInfo(" Finished job " + job.id + " from job set of time " + jobSet.time)
203+ if (jobSet.hasCompleted) {
204+ listenerBus.post(StreamingListenerBatchCompleted (jobSet.toBatchInfo))
205+ }
203206 job.result match {
204207 case Failure (e) =>
205208 reportError(" Error running job " + job, e)
@@ -211,7 +214,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
211214 jobSet.totalDelay / 1000.0 , jobSet.time.toString,
212215 jobSet.processingDelay / 1000.0
213216 ))
214- listenerBus.post(StreamingListenerBatchCompleted (jobSet.toBatchInfo))
215217 }
216218 }
217219 }
You can’t perform that action at this time.
0 commit comments