@@ -78,10 +78,12 @@ case class InMemoryTableScanExec(
7878
7979 private lazy val columnarBatchSchema = new StructType (columnIndices.map(i => relationSchema(i)))
8080
81- private def createAndDecompressColumn (cachedColumnarBatch : CachedBatch ): ColumnarBatch = {
81+ private def createAndDecompressColumn (
82+ cachedColumnarBatch : CachedBatch ,
83+ offHeapColumnVectorEnabled : Boolean ): ColumnarBatch = {
8284 val rowCount = cachedColumnarBatch.numRows
8385 val taskContext = Option (TaskContext .get())
84- val columnVectors = if (! conf. offHeapColumnVectorEnabled || taskContext.isEmpty) {
86+ val columnVectors = if (! offHeapColumnVectorEnabled || taskContext.isEmpty) {
8587 OnHeapColumnVector .allocateColumns(rowCount, columnarBatchSchema)
8688 } else {
8789 OffHeapColumnVector .allocateColumns(rowCount, columnarBatchSchema)
@@ -101,10 +103,13 @@ case class InMemoryTableScanExec(
101103
102104 private lazy val inputRDD : RDD [InternalRow ] = {
103105 val buffers = filteredCachedBatches()
106+ val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
104107 if (supportsBatch) {
105108 // HACK ALERT: This is actually an RDD[ColumnarBatch].
106109 // We're taking advantage of Scala's type erasure here to pass these batches along.
107- buffers.map(createAndDecompressColumn).asInstanceOf [RDD [InternalRow ]]
110+ buffers
111+ .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
112+ .asInstanceOf [RDD [InternalRow ]]
108113 } else {
109114 val numOutputRows = longMetric(" numOutputRows" )
110115
0 commit comments