Skip to content

Commit a1846ab

Browse files
committed
fix InMemoryRelation
1 parent 930ef7b commit a1846ab

File tree

3 files changed

+15
-15
lines changed

3 files changed

+15
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class CacheManager extends Logging {
9999
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
100100
sparkSession.sessionState.executePlan(planToCache).executedPlan,
101101
tableName,
102-
planToCache.stats)
102+
planToCache)
103103
cachedData.add(CachedData(planToCache, inMemoryRelation))
104104
}
105105
}
@@ -148,7 +148,7 @@ class CacheManager extends Logging {
148148
storageLevel = cd.cachedRepresentation.storageLevel,
149149
child = spark.sessionState.executePlan(cd.plan).executedPlan,
150150
tableName = cd.cachedRepresentation.tableName,
151-
statsOfPlanToCache = cd.plan.stats)
151+
logicalPlan = cd.plan)
152152
needToRecache += cd.copy(cachedRepresentation = newCache)
153153
}
154154
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.logical
28-
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Statistics}
28+
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics}
2929
import org.apache.spark.sql.execution.SparkPlan
3030
import org.apache.spark.storage.StorageLevel
3131
import org.apache.spark.util.LongAccumulator
@@ -38,9 +38,9 @@ object InMemoryRelation {
3838
storageLevel: StorageLevel,
3939
child: SparkPlan,
4040
tableName: Option[String],
41-
statsOfPlanToCache: Statistics): InMemoryRelation =
41+
logicalPlan: LogicalPlan): InMemoryRelation =
4242
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
43-
statsOfPlanToCache = statsOfPlanToCache)
43+
statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering)
4444
}
4545

4646

@@ -63,7 +63,8 @@ case class InMemoryRelation(
6363
tableName: Option[String])(
6464
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
6565
val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
66-
statsOfPlanToCache: Statistics)
66+
statsOfPlanToCache: Statistics,
67+
override val outputOrdering: Seq[SortOrder])
6768
extends logical.LeafNode with MultiInstanceRelation {
6869

6970
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -149,7 +150,7 @@ case class InMemoryRelation(
149150
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
150151
InMemoryRelation(
151152
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
152-
_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
153+
_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache, outputOrdering)
153154
}
154155

155156
override def newInstance(): this.type = {
@@ -162,13 +163,12 @@ case class InMemoryRelation(
162163
tableName)(
163164
_cachedColumnBuffers,
164165
sizeInBytesStats,
165-
statsOfPlanToCache).asInstanceOf[this.type]
166+
statsOfPlanToCache,
167+
outputOrdering).asInstanceOf[this.type]
166168
}
167169

168170
def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
169171

170172
override protected def otherCopyArgs: Seq[AnyRef] =
171173
Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
172-
173-
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
174174
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
4242
val storageLevel = MEMORY_ONLY
4343
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
4444
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None,
45-
data.logicalPlan.stats)
45+
data.logicalPlan)
4646

4747
assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel)
4848
inMemoryRelation.cachedColumnBuffers.collect().head match {
@@ -119,7 +119,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
119119
test("simple columnar query") {
120120
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
121121
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
122-
testData.logicalPlan.stats)
122+
testData.logicalPlan)
123123

124124
checkAnswer(scan, testData.collect().toSeq)
125125
}
@@ -138,7 +138,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
138138
val logicalPlan = testData.select('value, 'key).logicalPlan
139139
val plan = spark.sessionState.executePlan(logicalPlan).sparkPlan
140140
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
141-
logicalPlan.stats)
141+
logicalPlan)
142142

143143
checkAnswer(scan, testData.collect().map {
144144
case Row(key: Int, value: String) => value -> key
@@ -155,7 +155,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
155155
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
156156
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
157157
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
158-
testData.logicalPlan.stats)
158+
testData.logicalPlan)
159159

160160
checkAnswer(scan, testData.collect().toSeq)
161161
checkAnswer(scan, testData.collect().toSeq)
@@ -329,7 +329,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
329329
test("SPARK-17549: cached table size should be correctly calculated") {
330330
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
331331
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
332-
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan.stats)
332+
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan)
333333

334334
// Materialize the data.
335335
val expectedAnswer = data.collect()

0 commit comments

Comments
 (0)