@@ -26,16 +26,15 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
2626
2727import org .apache .spark .{SparkConf , SparkContext }
2828import org .apache .spark .util .{JsonProtocol , Utils }
29- import org .apache .spark .io .CompressionCodec
3029
3130/**
3231 * Test for whether EventLoggingListener logs events properly.
3332 *
34- * This checks whether special files are created using the specified configurations, and whether
35- * logged events can be read back into memory as expected.
33+ * This tests whether EventLoggingListener actually creates special files while logging events,
34+ * whether the parsing of these special files is correct, and whether the logged events can be
35+ * read and deserialized into actual SparkListenerEvents.
3636 */
3737class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
38-
3938 private val fileSystem = Utils .getHadoopFileSystem(" /" )
4039 private val allCompressionCodecs = Seq [String ](
4140 " org.apache.spark.io.LZFCompressionCodec" ,
@@ -79,6 +78,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
7978 }
8079
8180
81+ /* ----------------- *
82+ * Actual test logic *
83+ * ----------------- */
84+
8285 /**
8386 * Test whether names of special files are correctly identified and parsed.
8487 */
@@ -182,7 +185,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
182185 val conf = getLoggingConf(logDirPath, compressionCodec)
183186 val eventLogger = new EventLoggingListener (" test" , conf)
184187 eventLogger.start()
185- val fileSystem = Utils .getHadoopFileSystem(eventLogger.logDir)
186188 var eventLoggingInfo = EventLoggingListener .parseLoggingInfo(eventLogger.logDir, fileSystem)
187189 assertInfoCorrect(eventLoggingInfo, loggerStopped = false )
188190
@@ -215,7 +217,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
215217 listenerBus.postToAll(applicationEnd)
216218
217219 // Verify file contains exactly the two events logged
218- val fileSystem = Utils .getHadoopFileSystem(eventLogger.logDir)
219220 val eventLoggingInfo = EventLoggingListener .parseLoggingInfo(eventLogger.logDir, fileSystem)
220221 assert(eventLoggingInfo.logPaths.size > 0 )
221222 val fileStream = {
@@ -235,86 +236,101 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
235236
236237 /**
237238 * Test end-to-end event logging functionality in an application.
239+ * This runs a simple Spark job and asserts that the expected events are logged when expected.
238240 */
239241 private def testApplicationEventLogging (
240242 logDirPath : Option [String ] = None ,
241243 compressionCodec : Option [String ] = None ) {
242-
243244 val conf = getLoggingConf(logDirPath, compressionCodec)
244245 val sc = new SparkContext (" local" , " test" , conf)
245246 assert(sc.eventLogger.isDefined)
246247 val eventLogger = sc.eventLogger.get
247- val fileSystem = Utils .getHadoopFileSystem(eventLogger.logDir)
248248 val expectedLogDir = logDirPath.getOrElse(EventLoggingListener .DEFAULT_LOG_DIR )
249249 assert(eventLogger.logDir.startsWith(expectedLogDir))
250250
251- // Assert all specified events are found in the event log
252- def assertEventExists (events : Seq [String ]) {
253- val eventLoggingInfo = EventLoggingListener .parseLoggingInfo(eventLogger.logDir, fileSystem)
254- val logPath = eventLoggingInfo.logPaths.head
255- val fileStream = {
256- val stream = fileSystem.open(logPath)
257- eventLoggingInfo.compressionCodec.map { codec =>
258- codec.compressedInputStream(stream)
259- }.getOrElse(stream)
260- }
261- val lines = Source .fromInputStream(fileStream).getLines()
262- val eventSet = mutable.Set (events : _* )
263- lines.foreach { line =>
264- eventSet.foreach { event =>
265- if (line.contains(event) &&
266- JsonProtocol .sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
267- eventSet.remove(event)
268- }
251+ // Begin listening for events that trigger asserts
252+ val eventExistenceListener = new EventExistenceListener (eventLogger)
253+ sc.addSparkListener(eventExistenceListener)
254+
255+ // Trigger asserts for whether the expected events are actually logged
256+ sc.parallelize(1 to 10000 ).count()
257+ sc.stop()
258+
259+ // Ensure all asserts have actually been triggered
260+ eventExistenceListener.assertAllCallbacksInvoked()
261+ }
262+
263+ /**
264+ * Assert that all of the specified events are logged by the given EventLoggingListener.
265+ */
266+ private def assertEventExists (eventLogger : EventLoggingListener , events : Seq [String ]) {
267+ val eventLoggingInfo = EventLoggingListener .parseLoggingInfo(eventLogger.logDir, fileSystem)
268+ val logPath = eventLoggingInfo.logPaths.head
269+ val fileStream = {
270+ val stream = fileSystem.open(logPath)
271+ eventLoggingInfo.compressionCodec.map { codec =>
272+ codec.compressedInputStream(stream)
273+ }.getOrElse(stream)
274+ }
275+ val lines = Source .fromInputStream(fileStream).getLines()
276+ val eventSet = mutable.Set (events : _* )
277+ lines.foreach { line =>
278+ eventSet.foreach { event =>
279+ if (line.contains(event) &&
280+ JsonProtocol .sparkEventFromJson(parse(line)).getClass.getSimpleName == event) {
281+ eventSet.remove(event)
269282 }
270283 }
271- assert(eventSet.isEmpty, " The following events are missing: " + eventSet.toSeq)
272284 }
285+ assert(eventSet.isEmpty, " The following events are missing: " + eventSet.toSeq)
286+ }
273287
274- // SparkListenerEvents are posted in a separate thread
275- class AssertEventListener extends SparkListener {
276- var jobStarted = false
277- var jobEnded = false
278- var appEnded = false
279-
280- override def onJobStart (jobStart : SparkListenerJobStart ) {
281- assertEventExists(Seq [String ](
282- " SparkListenerApplicationStart" ,
283- " SparkListenerBlockManagerAdded" ,
284- " SparkListenerEnvironmentUpdate"
285- ))
286- jobStarted = true
287- }
288+ /**
289+ * A listener that asserts certain events are logged by the given EventLoggingListener.
290+ * This is necessary because events are posted asynchronously in a different thread.
291+ */
292+ private class EventExistenceListener (eventLogger : EventLoggingListener ) extends SparkListener {
293+ var jobStarted = false
294+ var jobEnded = false
295+ var appEnded = false
296+
297+ override def onJobStart (jobStart : SparkListenerJobStart ) {
298+ assertEventExists(eventLogger, Seq [String ](
299+ " SparkListenerApplicationStart" ,
300+ " SparkListenerBlockManagerAdded" ,
301+ " SparkListenerEnvironmentUpdate"
302+ ))
303+ jobStarted = true
304+ }
288305
289- override def onJobEnd (jobEnd : SparkListenerJobEnd ) {
290- assertEventExists(Seq [String ](
291- " SparkListenerJobStart" ,
292- " SparkListenerJobEnd" ,
293- " SparkListenerStageSubmitted" ,
294- " SparkListenerStageCompleted" ,
295- " SparkListenerTaskStart" ,
296- " SparkListenerTaskEnd"
297- ))
298- jobEnded = true
299- }
306+ override def onJobEnd (jobEnd : SparkListenerJobEnd ) {
307+ assertEventExists(eventLogger, Seq [String ](
308+ " SparkListenerJobStart" ,
309+ " SparkListenerJobEnd" ,
310+ " SparkListenerStageSubmitted" ,
311+ " SparkListenerStageCompleted" ,
312+ " SparkListenerTaskStart" ,
313+ " SparkListenerTaskEnd"
314+ ))
315+ jobEnded = true
316+ }
300317
301- override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ) {
302- assertEventExists(Seq [String ](" SparkListenerApplicationEnd" ))
303- appEnded = true
304- }
318+ override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ) {
319+ assertEventExists(eventLogger, Seq [String ](" SparkListenerApplicationEnd" ))
320+ appEnded = true
305321 }
306- val assertEventListener = new AssertEventListener
307- sc.addSparkListener(assertEventListener)
308322
309- // Trigger callbacks
310- sc.parallelize(1 to 10000 ).count()
311- sc.stop()
312- assert(assertEventListener.jobStarted, " JobStart callback not invoked!" )
313- assert(assertEventListener.jobEnded, " JobEnd callback not invoked!" )
314- assert(assertEventListener.appEnded, " ApplicationEnd callback not invoked!" )
323+ def assertAllCallbacksInvoked () {
324+ assert(jobStarted, " JobStart callback not invoked!" )
325+ assert(jobEnded, " JobEnd callback not invoked!" )
326+ assert(appEnded, " ApplicationEnd callback not invoked!" )
327+ }
315328 }
316329
317- /* Helper methods for validating state of the special files. */
330+
331+ /* -------------------------------------------------------- *
332+ * Helper methods for validating state of the special files *
333+ * -------------------------------------------------------- */
318334
319335 private def eventLogsExist (logFiles : Array [FileStatus ]): Boolean = {
320336 logFiles.map(_.getPath.getName).exists(EventLoggingListener .isEventLogFile)
0 commit comments