Skip to content

[VL] Use of columnar table cache causes error when data emitted from the cached plan is in vanilla columnar format #8497

@zhztheplayer

Description

@zhztheplayer

Edit: Plan to fix the issue:

  1. Add test case [GLUTEN-8497][VL] A bad test case that fails columnar table cache query #8498
  2. Add a query planer hook similar to AdaptiveContext.isAdaptiveContext, to determine whether the Gluten columnar rule is called for columnar table cache's inner query plan's optimization [GLUTEN-8497][CORE] A unified CallInfo API to replace AdaptiveContext #8551
  3. Unify the hook API for columnar table cache and AQE [GLUTEN-8497][CORE] A unified CallInfo API to replace AdaptiveContext #8551
  4. Fix test cases using the new hook API. Add a rule to make sure the output convention of the query plan conform to vanilla Spark's table cache optimization code. Specifically: ([GLUTEN-8497][VL] Fix columnar batch type mismatch in table cache #9230)
    • For query plan without a topmost ColumnarToRow, keep it as is;
    • For query plan with a topmost ColumnarToRow (which will be removed by Spark' code convertToColumnarIfPossible) and the second node is not in Velox columnar fomat, compare the costs between the following 2 query plan candidates, then choose the cheaper one:
      • The input plan unchanged, with a top fake unary node that prevents columnar cache to be used;
      • Velox columnar plan + a fake C2R to be removed;

The current code for determining whether to use columnar-based cache doesn't cover all possible cases. For example, the case when a vanilla Spark columnar scan fell back by unsupported data type.

If the case is met, the following error will be thrown:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10) (55f8ad28c275 executor driver): java.lang.IllegalStateException: Heavy batch should consist of arrow vectors
	at org.apache.gluten.columnarbatch.ColumnarBatches.identifyBatchType(ColumnarBatches.java:81)
	at org.apache.gluten.columnarbatch.ColumnarBatches.isLightBatch(ColumnarBatches.java:99)
	at org.apache.gluten.columnarbatch.ColumnarBatches.ensureOffloaded(ColumnarBatches.java:134)
	at org.apache.gluten.columnarbatch.VeloxColumnarBatches.ensureVeloxBatch(VeloxColumnarBatches.java:104)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer.$anonfun$convertColumnarBatchToCachedBatch$2(ColumnarCachedBatchSerializer.scala:177)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer$$anon$1.next(ColumnarCachedBatchSerializer.scala:183)
	at org.apache.spark.sql.execution.ColumnarCachedBatchSerializer$$anon$1.next(ColumnarCachedBatchSerializer.scala:179)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1531)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:378)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriage

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions