Skip to content

Commit ccfddd9

Browse files
author
Marcelo Vanzin
committed
Resolve at the source.
1 parent 20d2a34 commit ccfddd9

File tree

6 files changed

+19
-20
lines changed

6 files changed

+19
-20
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
227227
val appName = conf.get("spark.app.name")
228228

229229
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
230-
private[spark] val eventLogDir: Option[String] = {
230+
private[spark] val eventLogDir: Option[URI] = {
231231
if (isEventLogEnabled) {
232-
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
232+
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
233+
.stripSuffix("/")
234+
Some(Utils.resolveURI(unresolvedDir))
233235
} else {
234236
None
235237
}
@@ -1138,7 +1140,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11381140
* Return whether dynamically adjusting the amount of resources allocated to
11391141
* this application is supported. This is currently only available for YARN.
11401142
*/
1141-
private[spark] def supportDynamicAllocation =
1143+
private[spark] def supportDynamicAllocation =
11421144
master.contains("yarn") || dynamicAllocationTesting
11431145

11441146
/**

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URI
21+
2022
private[spark] class ApplicationDescription(
2123
val name: String,
2224
val maxCores: Option[Int],
2325
val memoryPerSlave: Int,
2426
val command: Command,
2527
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None,
28+
val eventLogDir: Option[URI] = None,
2729
// short name of compression codec used when writing event logs, if any (e.g. lzf)
2830
val eventLogCodec: Option[String] = None)
2931
extends Serializable {
@@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
3638
memoryPerSlave: Int = memoryPerSlave,
3739
command: Command = command,
3840
appUiUrl: String = appUiUrl,
39-
eventLogDir: Option[String] = eventLogDir,
41+
eventLogDir: Option[URI] = eventLogDir,
4042
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
4143
new ApplicationDescription(
4244
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ private[master] class Master(
755755
}
756756

757757
val eventLogFilePrefix = EventLoggingListener.getLogPath(
758-
new URI(eventLogDir), app.id, app.desc.eventLogCodec)
758+
eventLogDir, app.id, app.desc.eventLogCodec)
759759
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
760760
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
761761
EventLoggingListener.IN_PROGRESS))

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,16 @@ import org.apache.spark.util.{JsonProtocol, Utils}
4747
*/
4848
private[spark] class EventLoggingListener(
4949
appId: String,
50-
unresolvedLogBaseDir: String,
50+
logBaseDir: URI,
5151
sparkConf: SparkConf,
5252
hadoopConf: Configuration)
5353
extends SparkListener with Logging {
5454

5555
import EventLoggingListener._
5656

57-
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
57+
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
5858
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
5959

60-
private val logBaseDir = Utils.resolveURI(unresolvedLogBaseDir)
6160
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
6261
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
6362
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
6161
test("Verify log file exist") {
6262
// Verify logging directory exists
6363
val conf = getLoggingConf(testDirPath)
64-
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
64+
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
6565
eventLogger.start()
6666

6767
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -122,14 +122,6 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
122122
"a fine:mind$dollar{bills}.1", Some("lz4")))
123123
}
124124

125-
test("SPARK-6688: logger should always use resolved URIs") {
126-
val conf = getLoggingConf(testDirPath)
127-
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
128-
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().getPath(), conf)
129-
eventLogger.start()
130-
eventLogger.stop()
131-
}
132-
133125
/* ----------------- *
134126
* Actual test logic *
135127
* ----------------- */
@@ -148,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
148140
val conf = getLoggingConf(testDirPath, compressionCodec)
149141
extraConf.foreach { case (k, v) => conf.set(k, v) }
150142
val logName = compressionCodec.map("test-" + _).getOrElse("test")
151-
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
143+
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
152144
val listenerBus = new LiveListenerBus
153145
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
154146
125L, "Mickey")
@@ -184,7 +176,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
184176
* This runs a simple Spark job and asserts that the expected events are logged when expected.
185177
*/
186178
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
179+
// Set defaultFS to something that would cause an exception, to make sure we don't run
180+
// into SPARK-6688.
187181
val conf = getLoggingConf(testDirPath, compressionCodec)
182+
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
188183
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
189184
assert(sc.eventLogger.isDefined)
190185
val eventLogger = sc.eventLogger.get

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.{File, PrintWriter}
21+
import java.net.URI
2122

2223
import org.json4s.jackson.JsonMethods._
2324
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
145146
* log the events.
146147
*/
147148
private class EventMonster(conf: SparkConf)
148-
extends EventLoggingListener("test", "testdir", conf) {
149+
extends EventLoggingListener("test", new URI("testdir"), conf) {
149150

150151
override def start() { }
151152

0 commit comments

Comments
 (0)