@@ -157,7 +157,14 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
157157 final File outputFile = shuffleBlockManager .getDataFile (shuffleId , mapId );
158158 final int numPartitions = partitioner .numPartitions ();
159159 final long [] partitionLengths = new long [numPartitions ];
160+
161+ if (spills .length == 0 ) {
162+ new FileOutputStream (outputFile ).close ();
163+ return partitionLengths ;
164+ }
165+
160166 final FileChannel [] spillInputChannels = new FileChannel [spills .length ];
167+ final long [] spillInputChannelPositions = new long [spills .length ];
161168
162169 // TODO: We need to add an option to bypass transferTo here since older Linux kernels are
163170 // affected by a bug here that can lead to data truncation; see the comments Utils.scala,
@@ -173,24 +180,29 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
173180
174181 final FileChannel mergedFileOutputChannel = new FileOutputStream (outputFile ).getChannel ();
175182
176- for (int partition = 0 ; partition < numPartitions ; partition ++ ) {
183+ for (int partition = 0 ; partition < numPartitions ; partition ++) {
177184 for (int i = 0 ; i < spills .length ; i ++) {
178- final long bytesToTransfer = spills [i ].partitionLengths [partition ];
179- long bytesRemainingToBeTransferred = bytesToTransfer ;
185+ System .out .println ("In partition " + partition + " and spill " + i );
186+ final long partitionLengthInSpill = spills [i ].partitionLengths [partition ];
187+ System .out .println ("Partition length in spill is " + partitionLengthInSpill );
188+ System .out .println ("input channel position is " + spillInputChannels [i ].position ());
189+ long bytesRemainingToBeTransferred = partitionLengthInSpill ;
180190 final FileChannel spillInputChannel = spillInputChannels [i ];
181- long fromPosition = spillInputChannel .position ();
182191 while (bytesRemainingToBeTransferred > 0 ) {
183- bytesRemainingToBeTransferred - = spillInputChannel .transferTo (
184- fromPosition ,
192+ final long actualBytesTransferred = spillInputChannel .transferTo (
193+ spillInputChannelPositions [ i ] ,
185194 bytesRemainingToBeTransferred ,
186195 mergedFileOutputChannel );
196+ spillInputChannelPositions [i ] += actualBytesTransferred ;
197+ bytesRemainingToBeTransferred -= actualBytesTransferred ;
187198 }
188- partitionLengths [partition ] += bytesToTransfer ;
199+ partitionLengths [partition ] += partitionLengthInSpill ;
189200 }
190201 }
191202
192203 // TODO: should this be in a finally block?
193204 for (int i = 0 ; i < spills .length ; i ++) {
205+ assert (spillInputChannelPositions [i ] == spills [i ].file .length ());
194206 spillInputChannels [i ].close ();
195207 }
196208 mergedFileOutputChannel .close ();
0 commit comments