Skip to content

Commit c56ec18

Browse files
committed
Clean up final row copying code.
1 parent d31f180 commit c56ec18

File tree

2 files changed

+26
-23
lines changed

2 files changed

+26
-23
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions;
1919

20-
import javax.annotation.Nullable;
21-
2220
import org.apache.spark.sql.catalyst.InternalRow;
2321
import org.apache.spark.sql.catalyst.util.ObjectPool;
2422
import org.apache.spark.unsafe.PlatformDependent;
@@ -57,17 +55,15 @@
5755
*/
5856
public final class UnsafeRow extends MutableRow {
5957

60-
/** Hack for if we want to pass around an UnsafeRow which also carries around its backing data */
61-
@Nullable public byte[] backingArray;
6258
private Object baseObject;
6359
private long baseOffset;
6460

6561
/** A pool to hold non-primitive objects */
6662
private ObjectPool pool;
6763

68-
Object getBaseObject() { return baseObject; }
69-
long getBaseOffset() { return baseOffset; }
70-
ObjectPool getPool() { return pool; }
64+
public Object getBaseObject() { return baseObject; }
65+
public long getBaseOffset() { return baseOffset; }
66+
public ObjectPool getPool() { return pool; }
7167

7268
/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
7369
private int numFields;

sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.Arrays;
2222

23+
import org.apache.spark.sql.Row;
2324
import scala.collection.Iterator;
2425
import scala.math.Ordering;
2526

@@ -150,28 +151,34 @@ public boolean hasNext() {
150151
return sortedIterator.hasNext();
151152
}
152153

154+
/**
155+
* Called prior to returning this iterator's last row. This copies the row's data into an
156+
* on-heap byte array so that the pointer to the row data will not be dangling after the
157+
* sorter's memory pages are freed.
158+
*/
159+
private void detachRowFromPage(UnsafeRow row, int rowLength) {
160+
final byte[] rowDataCopy = new byte[rowLength];
161+
PlatformDependent.copyMemory(
162+
row.getBaseObject(),
163+
row.getBaseOffset(),
164+
rowDataCopy,
165+
PlatformDependent.BYTE_ARRAY_OFFSET,
166+
rowLength
167+
);
168+
row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, row.getPool());
169+
}
170+
153171
@Override
154172
public InternalRow next() {
155173
try {
156174
sortedIterator.loadNext();
157-
if (hasNext()) {
158-
row.pointTo(
159-
sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool);
160-
return row;
161-
} else {
162-
final byte[] rowDataCopy = new byte[sortedIterator.getRecordLength()];
163-
PlatformDependent.copyMemory(
164-
sortedIterator.getBaseObject(),
165-
sortedIterator.getBaseOffset(),
166-
rowDataCopy,
167-
PlatformDependent.BYTE_ARRAY_OFFSET,
168-
sortedIterator.getRecordLength()
169-
);
170-
row.backingArray = rowDataCopy;
171-
row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, objPool);
175+
row.pointTo(
176+
sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool);
177+
if (!hasNext()) {
178+
detachRowFromPage(row, sortedIterator.getRecordLength());
172179
cleanupResources();
173-
return row;
174180
}
181+
return row;
175182
} catch (IOException e) {
176183
cleanupResources();
177184
// Scala iterators don't declare any checked exceptions, so we need to use this hack

0 commit comments

Comments
 (0)