Skip to content

Commit 2d5daf8

Browse files
committed
Use temp directory provided by the OS rather than /tmp
1 parent 2b52151 commit 2d5daf8

File tree

6 files changed

+57
-49
lines changed

6 files changed

+57
-49
lines changed

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ private[spark] class EventLoggingListener(
6868
* If compression is used, log a file that indicates which compression library is used.
6969
*/
7070
def start() {
71+
logger.start()
7172
logInfo("Logging events to %s".format(logDir))
7273
if (shouldCompress) {
7374
val codec =

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ private[spark] class FileLogger(
6363

6464
private var writer: Option[PrintWriter] = None
6565

66-
createLogDir()
66+
/**
67+
* Start this logger by creating the logging directory.
68+
*/
69+
def start() {
70+
createLogDir()
71+
}
6772

6873
/**
6974
* Create a logging directory with the given path.

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,15 @@ private[spark] object Utils extends Logging {
10621062
getHadoopFileSystem(new URI(path))
10631063
}
10641064

1065+
/**
1066+
* Return the absolute path of a file in the given directory.
1067+
*/
1068+
def getFilePath(dir: File, fileName: String): Path = {
1069+
assert(dir.isDirectory)
1070+
val path = new File(dir, fileName).getAbsolutePath
1071+
new Path(path)
1072+
}
1073+
10651074
/**
10661075
* Indicates whether Spark is currently running unit tests.
10671076
*/

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.mutable
2121
import scala.io.Source
2222
import scala.util.Try
2323

24+
import com.google.common.io.Files
2425
import org.apache.hadoop.fs.{FileStatus, Path}
2526
import org.json4s.jackson.JsonMethods._
2627
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -42,10 +43,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
4243
"org.apache.spark.io.LZFCompressionCodec",
4344
"org.apache.spark.io.SnappyCompressionCodec"
4445
)
46+
private val testDir = Files.createTempDir()
47+
private val logDirPath = Utils.getFilePath(testDir, "spark-events")
4548

4649
after {
47-
Try { fileSystem.delete(new Path("/tmp/spark-events"), true) }
48-
Try { fileSystem.delete(new Path("/tmp/spark-foo"), true) }
50+
Try { fileSystem.delete(logDirPath, true) }
4951
}
5052

5153
test("Parse names of special files") {
@@ -54,7 +56,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
5456

5557
test("Verify special files exist") {
5658
testSpecialFilesExist()
57-
testSpecialFilesExist(logDirPath = Some("/tmp/spark-foo"))
5859
}
5960

6061
test("Verify special files exist with compression") {
@@ -65,7 +66,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
6566

6667
test("Parse event logging info") {
6768
testParsingLogInfo()
68-
testParsingLogInfo(logDirPath = Some("/tmp/spark-foo"))
6969
}
7070

7171
test("Parse event logging info with compression") {
@@ -76,7 +76,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
7676

7777
test("Basic event logging") {
7878
testEventLogging()
79-
testEventLogging(logDirPath = Some("/tmp/spark-foo"))
8079
}
8180

8281
test("Basic event logging with compression") {
@@ -87,7 +86,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
8786

8887
test("End-to-end event logging") {
8988
testApplicationEventLogging()
90-
testApplicationEventLogging(logDirPath = Some("/tmp/spark-foo"))
9189
}
9290

9391
test("End-to-end event logging with compression") {
@@ -143,9 +141,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
143141
* also exist. Only after the application has completed does the test expect the application
144142
* completed file to be present.
145143
*/
146-
private def testSpecialFilesExist(
147-
logDirPath: Option[String] = None,
148-
compressionCodec: Option[String] = None) {
144+
private def testSpecialFilesExist(compressionCodec: Option[String] = None) {
149145

150146
def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
151147
val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
@@ -164,10 +160,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
164160
// Verify logging directory exists
165161
val conf = getLoggingConf(logDirPath, compressionCodec)
166162
val eventLogger = new EventLoggingListener("test", conf)
163+
eventLogger.start()
167164
val logPath = new Path(eventLogger.logDir)
165+
assert(fileSystem.exists(logPath))
168166
val logDir = fileSystem.getFileStatus(logPath)
169167
assert(logDir.isDir)
170-
eventLogger.start()
171168

172169
// Verify special files are as expected before stop()
173170
var logFiles = fileSystem.listStatus(logPath)
@@ -186,9 +183,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
186183
* This includes whether it returns the correct Spark version, compression codec (if any),
187184
* and the application's completion status.
188185
*/
189-
private def testParsingLogInfo(
190-
logDirPath: Option[String] = None,
191-
compressionCodec: Option[String] = None) {
186+
private def testParsingLogInfo(compressionCodec: Option[String] = None) {
192187

193188
def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
194189
assert(info.logPaths.size > 0)
@@ -221,9 +216,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
221216
* This creates two simple events, posts them to the EventLoggingListener, and verifies that
222217
* exactly these two events are logged in the expected file.
223218
*/
224-
private def testEventLogging(
225-
logDirPath: Option[String] = None,
226-
compressionCodec: Option[String] = None) {
219+
private def testEventLogging(compressionCodec: Option[String] = None) {
227220
val conf = getLoggingConf(logDirPath, compressionCodec)
228221
val eventLogger = new EventLoggingListener("test", conf)
229222
val listenerBus = new LiveListenerBus
@@ -253,14 +246,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
253246
* Test end-to-end event logging functionality in an application.
254247
* This runs a simple Spark job and asserts that the expected events are logged when expected.
255248
*/
256-
private def testApplicationEventLogging(
257-
logDirPath: Option[String] = None,
258-
compressionCodec: Option[String] = None) {
249+
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
259250
val conf = getLoggingConf(logDirPath, compressionCodec)
260251
val sc = new SparkContext("local", "test", conf)
261252
assert(sc.eventLogger.isDefined)
262253
val eventLogger = sc.eventLogger.get
263-
val expectedLogDir = logDirPath.getOrElse(EventLoggingListener.DEFAULT_LOG_DIR)
254+
val expectedLogDir = logDirPath.toString
264255
assert(eventLogger.logDir.startsWith(expectedLogDir))
265256

266257
// Begin listening for events that trigger asserts
@@ -395,15 +386,11 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
395386
object EventLoggingListenerSuite {
396387

397388
/** Get a SparkConf with event logging enabled. */
398-
def getLoggingConf(
399-
logDir: Option[String] = None,
400-
compressionCodec: Option[String] = None) = {
389+
def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
401390
val conf = new SparkConf
402391
conf.set("spark.eventLog.enabled", "true")
403392
conf.set("spark.eventLog.testing", "true")
404-
logDir.foreach { dir =>
405-
conf.set("spark.eventLog.dir", dir)
406-
}
393+
conf.set("spark.eventLog.dir", logDir.toString)
407394
compressionCodec.foreach { codec =>
408395
conf.set("spark.eventLog.compress", "true")
409396
conf.set("spark.io.compression.codec", codec)

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.PrintWriter
2121

2222
import scala.util.Try
2323

24-
import org.apache.hadoop.fs.Path
24+
import com.google.common.io.Files
2525
import org.json4s.jackson.JsonMethods._
2626
import org.scalatest.{BeforeAndAfter, FunSuite}
2727

@@ -39,10 +39,11 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
3939
"org.apache.spark.io.LZFCompressionCodec",
4040
"org.apache.spark.io.SnappyCompressionCodec"
4141
)
42+
private val testDir = Files.createTempDir()
4243

4344
after {
44-
Try { fileSystem.delete(new Path("/tmp/events.txt"), true) }
45-
Try { fileSystem.delete(new Path("/tmp/test-replay"), true) }
45+
Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
46+
Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
4647
}
4748

4849
test("Simple replay") {
@@ -76,7 +77,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
7677
* Test simple replaying of events.
7778
*/
7879
private def testSimpleReplay(codecName: Option[String] = None) {
79-
val logFilePath = new Path("/tmp/events.txt")
80+
val logFilePath = Utils.getFilePath(testDir, "events.txt")
8081
val codec = codecName.map(getCompressionCodec)
8182
val fstream = fileSystem.create(logFilePath)
8283
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
@@ -87,7 +88,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
8788
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
8889
writer.close()
8990
val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
90-
val conf = EventLoggingListenerSuite.getLoggingConf(compressionCodec = codecName)
91+
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
9192
val eventMonster = new EventMonster(conf)
9293
replayer.addListener(eventMonster)
9394
replayer.replay()
@@ -104,8 +105,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
104105
* assumption that the event logging behavior is correct (tested in a separate suite).
105106
*/
106107
private def testApplicationReplay(codecName: Option[String] = None) {
107-
val logDir = "/tmp/test-replay"
108-
val conf = EventLoggingListenerSuite.getLoggingConf(Some(logDir), codecName)
108+
val logDirPath = Utils.getFilePath(testDir, "test-replay")
109+
val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
109110
val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
110111

111112
// Run a few jobs
@@ -117,7 +118,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
117118

118119
// Prepare information needed for replay
119120
val codec = codecName.map(getCompressionCodec)
120-
val applications = fileSystem.listStatus(new Path(logDir))
121+
val applications = fileSystem.listStatus(logDirPath)
121122
assert(applications != null && applications.size > 0)
122123
val eventLogDir = applications.sortBy(_.getAccessTime).last
123124
assert(eventLogDir.isDir)

core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.IOException
2222
import scala.io.Source
2323
import scala.util.Try
2424

25+
import com.google.common.io.Files
2526
import org.apache.hadoop.fs.Path
2627
import org.scalatest.{BeforeAndAfter, FunSuite}
2728

@@ -37,8 +38,9 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
3738
"org.apache.spark.io.LZFCompressionCodec",
3839
"org.apache.spark.io.SnappyCompressionCodec"
3940
)
40-
private val logDir = "/tmp/test-file-logger"
41-
private val logDirPath = new Path(logDir)
41+
private val testDir = Files.createTempDir()
42+
private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
43+
private val logDirPathString = logDirPath.toString
4244

4345
after {
4446
Try { fileSystem.delete(logDirPath, true) }
@@ -66,12 +68,14 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
6668

6769
test("Logging when directory already exists") {
6870
// Create the logging directory multiple times
69-
new FileLogger(logDir, new SparkConf, overwrite = true)
70-
new FileLogger(logDir, new SparkConf, overwrite = true)
71-
new FileLogger(logDir, new SparkConf, overwrite = true)
71+
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
72+
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
73+
new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
7274

7375
// If overwrite is not enabled, an exception should be thrown
74-
intercept[IOException] { new FileLogger(logDir, new SparkConf, overwrite = false) }
76+
intercept[IOException] {
77+
new FileLogger(logDirPathString, new SparkConf, overwrite = false).start()
78+
}
7579
}
7680

7781

@@ -87,10 +91,11 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
8791
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
8892
val logger =
8993
if (codecName.isDefined) {
90-
new FileLogger(logDir, conf, compress = true)
94+
new FileLogger(logDirPathString, conf, compress = true)
9195
} else {
92-
new FileLogger(logDir, conf)
96+
new FileLogger(logDirPathString, conf)
9397
}
98+
logger.start()
9499
assert(fileSystem.exists(logDirPath))
95100
assert(fileSystem.getFileStatus(logDirPath).isDir)
96101
assert(fileSystem.listStatus(logDirPath).size === 0)
@@ -118,20 +123,20 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
118123
val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
119124
val logger =
120125
if (codecName.isDefined) {
121-
new FileLogger(logDir, conf, compress = true)
126+
new FileLogger(logDirPathString, conf, compress = true)
122127
} else {
123-
new FileLogger(logDir, conf)
128+
new FileLogger(logDirPathString, conf)
124129
}
125-
130+
logger.start()
126131
logger.newFile("Jean_Valjean")
127132
logger.logLine("Who am I?")
128133
logger.logLine("Destiny?")
129134
logger.newFile("John_Valjohn")
130135
logger.logLine("One")
131-
logger.logLine("Two three four...")
136+
logger.logLine("Two three...")
132137
logger.close()
133-
assert(readFileContent(new Path(logDir + "/Jean_Valjean"), codec) === "Who am I?\nDestiny?")
134-
assert(readFileContent(new Path(logDir + "/John_Valjohn"), codec) === "One\nTwo three four...")
138+
assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
139+
assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
135140
}
136141

137142
/**

0 commit comments

Comments
 (0)