Skip to content

Commit 5d38ffe

Browse files
committed
Clean up EventLoggingListenerSuite + add comments
1 parent e12f4b1 commit 5d38ffe

File tree

2 files changed

+82
-66
lines changed

2 files changed

+82
-66
lines changed

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
3737
* Post an event to all attached listeners. This does nothing if the event is
3838
* SparkListenerShutdown.
3939
*/
40-
protected def postToAll(event: SparkListenerEvent) {
40+
def postToAll(event: SparkListenerEvent) {
4141
event match {
4242
case stageSubmitted: SparkListenerStageSubmitted =>
4343
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 81 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,15 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
2626

2727
import org.apache.spark.{SparkConf, SparkContext}
2828
import org.apache.spark.util.{JsonProtocol, Utils}
29-
import org.apache.spark.io.CompressionCodec
3029

3130
/**
3231
* Test for whether EventLoggingListener logs events properly.
3332
*
34-
* This checks whether special files are created using the specified configurations, and whether
35-
* logged events can be read back into memory as expected.
33+
* This tests whether EventLoggingListener actually creates special files while logging events,
34+
* whether the parsing of these special files is correct, and whether the logged events can be
35+
* read and deserialized into actual SparkListenerEvents.
3636
*/
3737
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
38-
3938
private val fileSystem = Utils.getHadoopFileSystem("/")
4039
private val allCompressionCodecs = Seq[String](
4140
"org.apache.spark.io.LZFCompressionCodec",
@@ -79,6 +78,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
7978
}
8079

8180

81+
/* ----------------- *
82+
* Actual test logic *
83+
* ----------------- */
84+
8285
/**
8386
* Test whether names of special files are correctly identified and parsed.
8487
*/
@@ -182,7 +185,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
182185
val conf = getLoggingConf(logDirPath, compressionCodec)
183186
val eventLogger = new EventLoggingListener("test", conf)
184187
eventLogger.start()
185-
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
186188
var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
187189
assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
188190

@@ -215,7 +217,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
215217
listenerBus.postToAll(applicationEnd)
216218

217219
// Verify file contains exactly the two events logged
218-
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
219220
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
220221
assert(eventLoggingInfo.logPaths.size > 0)
221222
val fileStream = {
@@ -235,86 +236,101 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
235236

236237
/**
237238
* Test end-to-end event logging functionality in an application.
239+
* This runs a simple Spark job and asserts that the expected events are logged when expected.
238240
*/
239241
private def testApplicationEventLogging(
240242
logDirPath: Option[String] = None,
241243
compressionCodec: Option[String] = None) {
242-
243244
val conf = getLoggingConf(logDirPath, compressionCodec)
244245
val sc = new SparkContext("local", "test", conf)
245246
assert(sc.eventLogger.isDefined)
246247
val eventLogger = sc.eventLogger.get
247-
val fileSystem = Utils.getHadoopFileSystem(eventLogger.logDir)
248248
val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR)
249249
assert(eventLogger.logDir.startsWith(expectedLogDir))
250250

251-
// Assert all specified events are found in the event log
252-
def assertEventExists(events: Seq[String]) {
253-
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
254-
val logPath = eventLoggingInfo.logPaths.head
255-
val fileStream = {
256-
val stream = fileSystem.open(logPath)
257-
eventLoggingInfo.compressionCodec.map { codec =>
258-
codec.compressedInputStream(stream)
259-
}.getOrElse(stream)
260-
}
261-
val lines = Source.fromInputStream(fileStream).getLines()
262-
val eventSet = mutable.Set(events: _*)
263-
lines.foreach { line =>
264-
eventSet.foreach { event =>
265-
if (line.contains(event) &&
266-
JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
267-
eventSet.remove(event)
268-
}
251+
// Begin listening for events that trigger asserts
252+
val eventExistenceListener = new EventExistenceListener(eventLogger)
253+
sc.addSparkListener(eventExistenceListener)
254+
255+
// Trigger asserts for whether the expected events are actually logged
256+
sc.parallelize(1 to 10000).count()
257+
sc.stop()
258+
259+
// Ensure all asserts have actually been triggered
260+
eventExistenceListener.assertAllCallbacksInvoked()
261+
}
262+
263+
/**
264+
* Assert that all of the specified events are logged by the given EventLoggingListener.
265+
*/
266+
private def assertEventExists(eventLogger: EventLoggingListener, events: Seq[String]) {
267+
val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
268+
val logPath = eventLoggingInfo.logPaths.head
269+
val fileStream = {
270+
val stream = fileSystem.open(logPath)
271+
eventLoggingInfo.compressionCodec.map { codec =>
272+
codec.compressedInputStream(stream)
273+
}.getOrElse(stream)
274+
}
275+
val lines = Source.fromInputStream(fileStream).getLines()
276+
val eventSet = mutable.Set(events: _*)
277+
lines.foreach { line =>
278+
eventSet.foreach { event =>
279+
if (line.contains(event) &&
280+
JsonProtocol.sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
281+
eventSet.remove(event)
269282
}
270283
}
271-
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
272284
}
285+
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
286+
}
273287

274-
// SparkListenerEvents are posted in a separate thread
275-
class AssertEventListener extends SparkListener {
276-
var jobStarted = false
277-
var jobEnded = false
278-
var appEnded = false
279-
280-
override def onJobStart(jobStart: SparkListenerJobStart) {
281-
assertEventExists(Seq[String](
282-
"SparkListenerApplicationStart",
283-
"SparkListenerBlockManagerAdded",
284-
"SparkListenerEnvironmentUpdate"
285-
))
286-
jobStarted = true
287-
}
288+
/**
289+
* A listener that asserts certain events are logged by the given EventLoggingListener.
290+
* This is necessary because events are posted asynchronously in a different thread.
291+
*/
292+
private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener {
293+
var jobStarted = false
294+
var jobEnded = false
295+
var appEnded = false
296+
297+
override def onJobStart(jobStart: SparkListenerJobStart) {
298+
assertEventExists(eventLogger, Seq[String](
299+
"SparkListenerApplicationStart",
300+
"SparkListenerBlockManagerAdded",
301+
"SparkListenerEnvironmentUpdate"
302+
))
303+
jobStarted = true
304+
}
288305

289-
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
290-
assertEventExists(Seq[String](
291-
"SparkListenerJobStart",
292-
"SparkListenerJobEnd",
293-
"SparkListenerStageSubmitted",
294-
"SparkListenerStageCompleted",
295-
"SparkListenerTaskStart",
296-
"SparkListenerTaskEnd"
297-
))
298-
jobEnded = true
299-
}
306+
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
307+
assertEventExists(eventLogger, Seq[String](
308+
"SparkListenerJobStart",
309+
"SparkListenerJobEnd",
310+
"SparkListenerStageSubmitted",
311+
"SparkListenerStageCompleted",
312+
"SparkListenerTaskStart",
313+
"SparkListenerTaskEnd"
314+
))
315+
jobEnded = true
316+
}
300317

301-
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
302-
assertEventExists(Seq[String]("SparkListenerApplicationEnd"))
303-
appEnded = true
304-
}
318+
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
319+
assertEventExists(eventLogger, Seq[String]("SparkListenerApplicationEnd"))
320+
appEnded = true
305321
}
306-
val assertEventListener = new AssertEventListener
307-
sc.addSparkListener(assertEventListener)
308322

309-
// Trigger callbacks
310-
sc.parallelize(1 to 10000).count()
311-
sc.stop()
312-
assert(assertEventListener.jobStarted, "JobStart callback not invoked!")
313-
assert(assertEventListener.jobEnded, "JobEnd callback not invoked!")
314-
assert(assertEventListener.appEnded, "ApplicationEnd callback not invoked!")
323+
def assertAllCallbacksInvoked() {
324+
assert(jobStarted, "JobStart callback not invoked!")
325+
assert(jobEnded, "JobEnd callback not invoked!")
326+
assert(appEnded, "ApplicationEnd callback not invoked!")
327+
}
315328
}
316329

317-
/* Helper methods for validating state of the special files. */
330+
331+
/* -------------------------------------------------------- *
332+
* Helper methods for validating state of the special files *
333+
* -------------------------------------------------------- */
318334

319335
private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
320336
logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)

0 commit comments

Comments
 (0)