3737import org .apache .spark .serializer .SerializerInstance ;
3838import org .apache .spark .storage .BlockManager ;
3939import org .apache .spark .storage .DiskBlockObjectWriter ;
40+ import org .apache .spark .storage .FileSegment ;
4041import org .apache .spark .storage .TempShuffleBlockId ;
4142import org .apache .spark .unsafe .Platform ;
4243import org .apache .spark .unsafe .array .LongArray ;
@@ -150,10 +151,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
150151 final ShuffleInMemorySorter .ShuffleSorterIterator sortedRecords =
151152 inMemSorter .getSortedIterator ();
152153
153- // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
154- // after SPARK-5581 is fixed.
155- DiskBlockObjectWriter writer ;
156-
157154 // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
158155 // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
159156 // data through a byte array. This array does not need to be large enough to hold a single
@@ -175,7 +172,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
175172 // around this, we pass a dummy no-op serializer.
176173 final SerializerInstance ser = DummySerializerInstance .INSTANCE ;
177174
178- writer = blockManager .getDiskWriter (blockId , file , ser , fileBufferSizeBytes , writeMetricsToUse );
175+ final DiskBlockObjectWriter writer =
176+ blockManager .getDiskWriter (blockId , file , ser , fileBufferSizeBytes , writeMetricsToUse );
179177
180178 int currentPartition = -1 ;
181179 while (sortedRecords .hasNext ()) {
@@ -185,12 +183,10 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
185183 if (partition != currentPartition ) {
186184 // Switch to the new partition
187185 if (currentPartition != -1 ) {
188- writer .commitAndClose ();
189- spillInfo .partitionLengths [currentPartition ] = writer . fileSegment () .length ();
186+ final FileSegment fileSegment = writer .commitAndGet ();
187+ spillInfo .partitionLengths [currentPartition ] = fileSegment .length ();
190188 }
191189 currentPartition = partition ;
192- writer =
193- blockManager .getDiskWriter (blockId , file , ser , fileBufferSizeBytes , writeMetricsToUse );
194190 }
195191
196192 final long recordPointer = sortedRecords .packedRecordPointer .getRecordPointer ();
@@ -209,15 +205,14 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
209205 writer .recordWritten ();
210206 }
211207
212- if (writer != null ) {
213- writer .commitAndClose ();
214- // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
215- // then the file might be empty. Note that it might be better to avoid calling
216- // writeSortedFile() in that case.
217- if (currentPartition != -1 ) {
218- spillInfo .partitionLengths [currentPartition ] = writer .fileSegment ().length ();
219- spills .add (spillInfo );
220- }
208+ final FileSegment committedSegment = writer .commitAndGet ();
209+ writer .close ();
210+ // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
211+ // then the file might be empty. Note that it might be better to avoid calling
212+ // writeSortedFile() in that case.
213+ if (currentPartition != -1 ) {
214+ spillInfo .partitionLengths [currentPartition ] = committedSegment .length ();
215+ spills .add (spillInfo );
221216 }
222217
223218 if (!isLastFile ) { // i.e. this is a spill file
0 commit comments