|
20 | 20 | import java.io.IOException; |
21 | 21 | import java.util.Arrays; |
22 | 22 |
|
| 23 | +import org.apache.spark.sql.Row; |
23 | 24 | import scala.collection.Iterator; |
24 | 25 | import scala.math.Ordering; |
25 | 26 |
|
@@ -150,28 +151,34 @@ public boolean hasNext() { |
150 | 151 | return sortedIterator.hasNext(); |
151 | 152 | } |
152 | 153 |
|
| 154 | + /** |
| 155 | + * Called prior to returning this iterator's last row. This copies the row's data into an |
| 156 | + * on-heap byte array so that the pointer to the row data will not be dangling after the |
| 157 | + * sorter's memory pages are freed. |
| 158 | + */ |
| 159 | + private void detachRowFromPage(UnsafeRow row, int rowLength) { |
| 160 | + final byte[] rowDataCopy = new byte[rowLength]; |
| 161 | + PlatformDependent.copyMemory( |
| 162 | + row.getBaseObject(), |
| 163 | + row.getBaseOffset(), |
| 164 | + rowDataCopy, |
| 165 | + PlatformDependent.BYTE_ARRAY_OFFSET, |
| 166 | + rowLength |
| 167 | + ); |
| 168 | + row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, row.getPool()); |
| 169 | + } |
| 170 | + |
153 | 171 | @Override |
154 | 172 | public InternalRow next() { |
155 | 173 | try { |
156 | 174 | sortedIterator.loadNext(); |
157 | | - if (hasNext()) { |
158 | | - row.pointTo( |
159 | | - sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool); |
160 | | - return row; |
161 | | - } else { |
162 | | - final byte[] rowDataCopy = new byte[sortedIterator.getRecordLength()]; |
163 | | - PlatformDependent.copyMemory( |
164 | | - sortedIterator.getBaseObject(), |
165 | | - sortedIterator.getBaseOffset(), |
166 | | - rowDataCopy, |
167 | | - PlatformDependent.BYTE_ARRAY_OFFSET, |
168 | | - sortedIterator.getRecordLength() |
169 | | - ); |
170 | | - row.backingArray = rowDataCopy; |
171 | | - row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, objPool); |
| 175 | + row.pointTo( |
| 176 | + sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool); |
| 177 | + if (!hasNext()) { |
| 178 | + detachRowFromPage(row, sortedIterator.getRecordLength()); |
172 | 179 | cleanupResources(); |
173 | | - return row; |
174 | 180 | } |
| 181 | + return row; |
175 | 182 | } catch (IOException e) { |
176 | 183 | cleanupResources(); |
177 | 184 | // Scala iterators don't declare any checked exceptions, so we need to use this hack |
|
0 commit comments