Skip to content

Commit 465ccc6

Browse files
committed
address the comments
1 parent a8646ac commit 465ccc6

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
200200
job.setEndTime(completedTime)
201201
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
202202
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
203+
if (jobSet.hasCompleted) {
204+
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
205+
}
203206
job.result match {
204207
case Failure(e) =>
205208
reportError("Error running job " + job, e)
@@ -211,7 +214,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
211214
jobSet.totalDelay / 1000.0, jobSet.time.toString,
212215
jobSet.processingDelay / 1000.0
213216
))
214-
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
215217
}
216218
}
217219
}

0 commit comments

Comments
 (0)