Skip to content

Commit 14a3bb3

Browse files
Sumedh Walecloud-fan
authored andcommitted
[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh Wale <[email protected]> Closes #18535 from sumwale/SPARK-21312.
1 parent 75b168f commit 14a3bb3

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ public void copyFrom(UnsafeRow row) {
550550
*/
551551
public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
552552
if (baseObject instanceof byte[]) {
553-
int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
553+
int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
554554
out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
555555
} else {
556556
int dataRemaining = sizeInBytes;

sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
101101
MemoryAllocator.UNSAFE.free(offheapRowPage)
102102
}
103103
}
104+
val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = {
105+
val baos = new ByteArrayOutputStream()
106+
val numBytes = arrayBackedUnsafeRow.getSizeInBytes
107+
val bytesWithOffset = new Array[Byte](numBytes + 100)
108+
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0,
109+
bytesWithOffset, 100, numBytes)
110+
val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
111+
arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes)
112+
arrayBackedRow.writeToStream(baos, null)
113+
(baos.toByteArray, arrayBackedRow.getString(0))
114+
}
104115

105116
assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
106117
assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
118+
assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
119+
assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset)
107120
}
108121

109122
test("calling getDouble() and getFloat() on null columns") {

0 commit comments

Comments
 (0)