@@ -184,8 +184,7 @@ class CheckpointWriter(
184184 val executor = Executors .newFixedThreadPool(1 )
185185 val compressionCodec = CompressionCodec .createCodec(conf)
186186 private var stopped = false
187- private var _fs : FileSystem = _
188-
187+ @ volatile private [this ] var fs : FileSystem = null
189188 @ volatile private var latestCheckpointTime : Time = null
190189
191190 class CheckpointWriteHandler (
@@ -196,6 +195,9 @@ class CheckpointWriter(
196195 if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
197196 latestCheckpointTime = checkpointTime
198197 }
198+ if (fs == null ) {
199+ fs = new Path (checkpointDir).getFileSystem(hadoopConf)
200+ }
199201 var attempts = 0
200202 val startTime = System .currentTimeMillis()
201203 val tempFile = new Path (checkpointDir, " temp" )
@@ -263,7 +265,7 @@ class CheckpointWriter(
263265 case ioe : IOException =>
264266 logWarning(" Error in attempt " + attempts + " of writing checkpoint to "
265267 + checkpointFile, ioe)
266- reset()
268+ fs = null
267269 }
268270 }
269271 logWarning(" Could not write checkpoint for time " + checkpointTime + " to file "
@@ -297,15 +299,6 @@ class CheckpointWriter(
297299 " , waited for " + (endTime - startTime) + " ms." )
298300 stopped = true
299301 }
300-
301- private def fs = synchronized {
302- if (_fs == null ) _fs = new Path (checkpointDir).getFileSystem(hadoopConf)
303- _fs
304- }
305-
306- private def reset () = synchronized {
307- _fs = null
308- }
309302}
310303
311304
0 commit comments