@@ -62,7 +62,11 @@ private[spark] class IndexShuffleBlockResolver(
6262 private val remoteShuffleMaxDisk : Option [Long ] =
6363 conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE )
6464
65- def getDataFile (shuffleId : Int , mapId : Long ): File = getDataFile(shuffleId, mapId, None )
65+ def getDataFile (shuffleId : Int , mapId : Long ): File =
66+ getDataFile(shuffleId, mapId, None , true )
67+
68+ def getDataFile (shuffleId : Int , mapId : Long , needCreate : Boolean ): File =
69+ getDataFile(shuffleId, mapId, None , needCreate)
6670
6771 /**
6872 * Get the shuffle files that are stored locally. Used for block migrations.
@@ -95,12 +99,16 @@ private[spark] class IndexShuffleBlockResolver(
9599 * When the dirs parameter is None then use the disk manager's local directories. Otherwise,
96100 * read from the specified directories.
97101 */
98- def getDataFile (shuffleId : Int , mapId : Long , dirs : Option [Array [String ]]): File = {
102+ def getDataFile (
103+ shuffleId : Int ,
104+ mapId : Long ,
105+ dirs : Option [Array [String ]],
106+ needCreate : Boolean ): File = {
99107 val blockId = ShuffleDataBlockId (shuffleId, mapId, NOOP_REDUCE_ID )
100108 dirs
101109 .map(d =>
102110 new File (ExecutorDiskUtils .getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
103- .getOrElse(blockManager.diskBlockManager.getFile(blockId))
111+ .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate ))
104112 }
105113
106114 /**
@@ -112,12 +120,13 @@ private[spark] class IndexShuffleBlockResolver(
112120 def getIndexFile (
113121 shuffleId : Int ,
114122 mapId : Long ,
115- dirs : Option [Array [String ]] = None ): File = {
123+ dirs : Option [Array [String ]] = None ,
124+ needCreate : Boolean = true ): File = {
116125 val blockId = ShuffleIndexBlockId (shuffleId, mapId, NOOP_REDUCE_ID )
117126 dirs
118127 .map(d =>
119128 new File (ExecutorDiskUtils .getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
120- .getOrElse(blockManager.diskBlockManager.getFile(blockId))
129+ .getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate ))
121130 }
122131
123132 private def getMergedBlockDataFile (
@@ -154,17 +163,18 @@ private[spark] class IndexShuffleBlockResolver(
154163 * Remove data file and index file that contain the output data from one map.
155164 */
156165 def removeDataByMap (shuffleId : Int , mapId : Long ): Unit = {
157- var file = getDataFile(shuffleId, mapId)
166+ var file = getDataFile(shuffleId, mapId, needCreate = false )
158167 if (file.exists() && ! file.delete()) {
159168 logWarning(s " Error deleting data ${file.getPath()}" )
160169 }
161170
162- file = getIndexFile(shuffleId, mapId)
171+ file = getIndexFile(shuffleId, mapId, needCreate = false )
163172 if (file.exists() && ! file.delete()) {
164173 logWarning(s " Error deleting index ${file.getPath()}" )
165174 }
166175
167- file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM ))
176+ file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM ),
177+ needCreate = false )
168178 if (file.exists() && ! file.delete()) {
169179 logWarning(s " Error deleting checksum ${file.getPath()}" )
170180 }
@@ -549,13 +559,14 @@ private[spark] class IndexShuffleBlockResolver(
549559 shuffleId : Int ,
550560 mapId : Long ,
551561 algorithm : String ,
552- dirs : Option [Array [String ]] = None ): File = {
562+ dirs : Option [Array [String ]] = None ,
563+ needCreate : Boolean = true ): File = {
553564 val blockId = ShuffleChecksumBlockId (shuffleId, mapId, NOOP_REDUCE_ID )
554565 val fileName = ShuffleChecksumHelper .getChecksumFileName(blockId.name, algorithm)
555566 dirs
556567 .map(d =>
557568 new File (ExecutorDiskUtils .getFilePath(d, blockManager.subDirsPerLocalDir, fileName)))
558- .getOrElse(blockManager.diskBlockManager.getFile(fileName))
569+ .getOrElse(blockManager.diskBlockManager.getFile(fileName, needCreate ))
559570 }
560571
561572 override def getBlockData (
@@ -594,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver(
594605 }
595606 new FileSegmentManagedBuffer (
596607 transportConf,
597- getDataFile(shuffleId, mapId, dirs),
608+ getDataFile(shuffleId, mapId, dirs, true ),
598609 startOffset,
599610 endOffset - startOffset)
600611 } finally {
0 commit comments