Skip to content

Commit 991b526

Browse files
committed
[SPARK-24166][SQL] InMemoryTableScanExec should not access SQLConf at executor side
## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. ## How was this patch tested? it's tested in #21190 Author: Wenchen Fan <[email protected]> Closes #21223 from cloud-fan/minor1.
1 parent 417ad92 commit 991b526

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ case class InMemoryTableScanExec(
7878

7979
private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i)))
8080

81-
private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
81+
private def createAndDecompressColumn(
82+
cachedColumnarBatch: CachedBatch,
83+
offHeapColumnVectorEnabled: Boolean): ColumnarBatch = {
8284
val rowCount = cachedColumnarBatch.numRows
8385
val taskContext = Option(TaskContext.get())
84-
val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) {
86+
val columnVectors = if (!offHeapColumnVectorEnabled || taskContext.isEmpty) {
8587
OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
8688
} else {
8789
OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
@@ -101,10 +103,13 @@ case class InMemoryTableScanExec(
101103

102104
private lazy val inputRDD: RDD[InternalRow] = {
103105
val buffers = filteredCachedBatches()
106+
val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
104107
if (supportsBatch) {
105108
// HACK ALERT: This is actually an RDD[ColumnarBatch].
106109
// We're taking advantage of Scala's type erasure here to pass these batches along.
107-
buffers.map(createAndDecompressColumn).asInstanceOf[RDD[InternalRow]]
110+
buffers
111+
.map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
112+
.asInstanceOf[RDD[InternalRow]]
108113
} else {
109114
val numOutputRows = longMetric("numOutputRows")
110115

0 commit comments

Comments
 (0)