Skip to content

Commit 61a388a

Browse files
committed
Fix race condition in CheckpointWriter shutdown.
1 parent 7131b03 commit 61a388a

File tree

1 file changed

+5
-12
lines changed

1 file changed

+5
-12
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ class CheckpointWriter(
184184
val executor = Executors.newFixedThreadPool(1)
185185
val compressionCodec = CompressionCodec.createCodec(conf)
186186
private var stopped = false
187-
private var _fs: FileSystem = _
188-
187+
@volatile private[this] var fs: FileSystem = null
189188
@volatile private var latestCheckpointTime: Time = null
190189

191190
class CheckpointWriteHandler(
@@ -196,6 +195,9 @@ class CheckpointWriter(
196195
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
197196
latestCheckpointTime = checkpointTime
198197
}
198+
if (fs == null) {
199+
fs = new Path(checkpointDir).getFileSystem(hadoopConf)
200+
}
199201
var attempts = 0
200202
val startTime = System.currentTimeMillis()
201203
val tempFile = new Path(checkpointDir, "temp")
@@ -263,7 +265,7 @@ class CheckpointWriter(
263265
case ioe: IOException =>
264266
logWarning("Error in attempt " + attempts + " of writing checkpoint to "
265267
+ checkpointFile, ioe)
266-
reset()
268+
fs = null
267269
}
268270
}
269271
logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
@@ -297,15 +299,6 @@ class CheckpointWriter(
297299
", waited for " + (endTime - startTime) + " ms.")
298300
stopped = true
299301
}
300-
301-
private def fs = synchronized {
302-
if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
303-
_fs
304-
}
305-
306-
private def reset() = synchronized {
307-
_fs = null
308-
}
309302
}
310303

311304

0 commit comments

Comments
 (0)