@@ -21,6 +21,7 @@ import scala.collection.mutable
2121import scala .io .Source
2222import scala .util .Try
2323
24+ import com .google .common .io .Files
2425import org .apache .hadoop .fs .{FileStatus , Path }
2526import org .json4s .jackson .JsonMethods ._
2627import org .scalatest .{BeforeAndAfter , FunSuite }
@@ -42,10 +43,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
4243 " org.apache.spark.io.LZFCompressionCodec" ,
4344 " org.apache.spark.io.SnappyCompressionCodec"
4445 )
46+ private val testDir = Files .createTempDir()
47+ private val logDirPath = Utils .getFilePath(testDir, " spark-events" )
4548
4649 after {
47- Try { fileSystem.delete(new Path (" /tmp/spark-events" ), true ) }
48- Try { fileSystem.delete(new Path (" /tmp/spark-foo" ), true ) }
50+ Try { fileSystem.delete(logDirPath, true ) }
4951 }
5052
5153 test(" Parse names of special files" ) {
@@ -54,7 +56,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
5456
5557 test(" Verify special files exist" ) {
5658 testSpecialFilesExist()
57- testSpecialFilesExist(logDirPath = Some (" /tmp/spark-foo" ))
5859 }
5960
6061 test(" Verify special files exist with compression" ) {
@@ -65,7 +66,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
6566
6667 test(" Parse event logging info" ) {
6768 testParsingLogInfo()
68- testParsingLogInfo(logDirPath = Some (" /tmp/spark-foo" ))
6969 }
7070
7171 test(" Parse event logging info with compression" ) {
@@ -76,7 +76,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
7676
7777 test(" Basic event logging" ) {
7878 testEventLogging()
79- testEventLogging(logDirPath = Some (" /tmp/spark-foo" ))
8079 }
8180
8281 test(" Basic event logging with compression" ) {
@@ -87,7 +86,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
8786
8887 test(" End-to-end event logging" ) {
8988 testApplicationEventLogging()
90- testApplicationEventLogging(logDirPath = Some (" /tmp/spark-foo" ))
9189 }
9290
9391 test(" End-to-end event logging with compression" ) {
@@ -143,9 +141,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
143141 * also exist. Only after the application has completed does the test expect the application
144142 * completed file to be present.
145143 */
146- private def testSpecialFilesExist (
147- logDirPath : Option [String ] = None ,
148- compressionCodec : Option [String ] = None ) {
144+ private def testSpecialFilesExist (compressionCodec : Option [String ] = None ) {
149145
150146 def assertFilesExist (logFiles : Array [FileStatus ], loggerStopped : Boolean ) {
151147 val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
@@ -164,10 +160,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
164160 // Verify logging directory exists
165161 val conf = getLoggingConf(logDirPath, compressionCodec)
166162 val eventLogger = new EventLoggingListener (" test" , conf)
163+ eventLogger.start()
167164 val logPath = new Path (eventLogger.logDir)
165+ assert(fileSystem.exists(logPath))
168166 val logDir = fileSystem.getFileStatus(logPath)
169167 assert(logDir.isDir)
170- eventLogger.start()
171168
172169 // Verify special files are as expected before stop()
173170 var logFiles = fileSystem.listStatus(logPath)
@@ -186,9 +183,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
186183 * This includes whether it returns the correct Spark version, compression codec (if any),
187184 * and the application's completion status.
188185 */
189- private def testParsingLogInfo (
190- logDirPath : Option [String ] = None ,
191- compressionCodec : Option [String ] = None ) {
186+ private def testParsingLogInfo (compressionCodec : Option [String ] = None ) {
192187
193188 def assertInfoCorrect (info : EventLoggingInfo , loggerStopped : Boolean ) {
194189 assert(info.logPaths.size > 0 )
@@ -221,9 +216,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
221216 * This creates two simple events, posts them to the EventLoggingListener, and verifies that
222217 * exactly these two events are logged in the expected file.
223218 */
224- private def testEventLogging (
225- logDirPath : Option [String ] = None ,
226- compressionCodec : Option [String ] = None ) {
219+ private def testEventLogging (compressionCodec : Option [String ] = None ) {
227220 val conf = getLoggingConf(logDirPath, compressionCodec)
228221 val eventLogger = new EventLoggingListener (" test" , conf)
229222 val listenerBus = new LiveListenerBus
@@ -253,14 +246,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
253246 * Test end-to-end event logging functionality in an application.
254247 * This runs a simple Spark job and asserts that the expected events are logged when expected.
255248 */
256- private def testApplicationEventLogging (
257- logDirPath : Option [String ] = None ,
258- compressionCodec : Option [String ] = None ) {
249+ private def testApplicationEventLogging (compressionCodec : Option [String ] = None ) {
259250 val conf = getLoggingConf(logDirPath, compressionCodec)
260251 val sc = new SparkContext (" local" , " test" , conf)
261252 assert(sc.eventLogger.isDefined)
262253 val eventLogger = sc.eventLogger.get
263- val expectedLogDir = logDirPath.getOrElse( EventLoggingListener . DEFAULT_LOG_DIR )
254+ val expectedLogDir = logDirPath.toString
264255 assert(eventLogger.logDir.startsWith(expectedLogDir))
265256
266257 // Begin listening for events that trigger asserts
@@ -395,15 +386,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
395386object EventLoggingListenerSuite {
396387
397388 /** Get a SparkConf with event logging enabled. */
398- def getLoggingConf (
399- logDir : Option [String ] = None ,
400- compressionCodec : Option [String ] = None ) = {
389+ def getLoggingConf (logDir : Path , compressionCodec : Option [String ] = None ) = {
401390 val conf = new SparkConf
402391 conf.set(" spark.eventLog.enabled" , " true" )
403392 conf.set(" spark.eventLog.testing" , " true" )
404- logDir.foreach { dir =>
405- conf.set(" spark.eventLog.dir" , dir)
406- }
393+ conf.set(" spark.eventLog.dir" , logDir.toString)
407394 compressionCodec.foreach { codec =>
408395 conf.set(" spark.eventLog.compress" , " true" )
409396 conf.set(" spark.io.compression.codec" , codec)
0 commit comments