Skip to content

Commit 56dd20f

Browse files
committed
[SPARK-41708][SQL][FOLLOWUP] Do not insert columnar to row transition before write command
### What changes were proposed in this pull request? This is a followup of apache#39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data. This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration. ### Why are the changes needed? If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes apache#39922 from cloud-fan/write. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d5b0cb4 commit 56dd20f

File tree

2 files changed

+52
-3
lines changed

2 files changed

+52
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
2828
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2929
import org.apache.spark.sql.catalyst.rules.Rule
3030
import org.apache.spark.sql.errors.QueryExecutionErrors
31+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
32+
import org.apache.spark.sql.execution.datasources.V1WriteCommand
3133
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3234
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
3335
import org.apache.spark.sql.types._
@@ -541,10 +543,19 @@ case class ApplyColumnarRulesAndInsertTransitions(
541543
// `outputsColumnar` is false but the plan only outputs columnar format, so add a
542544
// to-row transition here.
543545
ColumnarToRowExec(insertRowToColumnar(plan))
544-
} else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
545-
plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false)))
546-
} else {
546+
} else if (plan.isInstanceOf[ColumnarToRowTransition]) {
547547
plan
548+
} else {
549+
val outputsColumnar = plan match {
550+
// With planned write, the write command invokes child plan's `executeWrite` which is
551+
// neither columnar nor row-based.
552+
case write: DataWritingCommandExec
553+
if write.cmd.isInstanceOf[V1WriteCommand] && conf.plannedWriteEnabled =>
554+
write.child.supportsColumnar
555+
case _ =>
556+
false
557+
}
558+
plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar)))
548559
}
549560
}
550561

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,19 @@ import org.apache.spark.broadcast.Broadcast
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.rdd.RDD
2828
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
29+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
30+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2931
import org.apache.spark.sql.catalyst.expressions._
3032
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
3133
import org.apache.spark.sql.catalyst.plans.SQLHelper
3234
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Limit, LocalRelation, LogicalPlan, Statistics, UnresolvedHint}
3335
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
3436
import org.apache.spark.sql.catalyst.rules.Rule
3537
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
38+
import org.apache.spark.sql.connector.write.WriterCommitMessage
3639
import org.apache.spark.sql.execution._
3740
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec, ShuffleQueryStageExec}
41+
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesSpec}
3842
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
3943
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
4044
import org.apache.spark.sql.internal.SQLConf
@@ -301,6 +305,11 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper {
301305
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
302306
assert(result(1).getLong(0) == 201L)
303307
assert(result(2).getLong(0) == 301L)
308+
309+
withTempPath { path =>
310+
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
311+
assert(e.getMessage == "columnar write")
312+
}
304313
}
305314
}
306315

@@ -790,6 +799,27 @@ class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
790799
new ColumnarProjectExec(projectList, newChild)
791800
}
792801

802+
class ColumnarWriteExec(
803+
child: SparkPlan,
804+
fileFormat: FileFormat,
805+
partitionColumns: Seq[Attribute],
806+
bucketSpec: Option[BucketSpec],
807+
options: Map[String, String],
808+
staticPartitions: TablePartitionSpec) extends WriteFilesExec(
809+
child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) {
810+
811+
override def supportsColumnar(): Boolean = true
812+
813+
override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
814+
assert(child.supportsColumnar)
815+
throw new Exception("columnar write")
816+
}
817+
818+
override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
819+
new ColumnarWriteExec(
820+
newChild, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
821+
}
822+
793823
/**
794824
* A version of add that supports columnar processing for longs. This version is broken
795825
* on purpose so it adds the numbers plus 1 so that the tests can show that it was replaced.
@@ -897,6 +927,14 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
897927
new ColumnarProjectExec(plan.projectList.map((exp) =>
898928
replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]),
899929
replaceWithColumnarPlan(plan.child))
930+
case write: WriteFilesExec =>
931+
new ColumnarWriteExec(
932+
replaceWithColumnarPlan(write.child),
933+
write.fileFormat,
934+
write.partitionColumns,
935+
write.bucketSpec,
936+
write.options,
937+
write.staticPartitions)
900938
case p =>
901939
logWarning(s"Columnar processing for ${p.getClass} is not currently supported.")
902940
p.withNewChildren(p.children.map(replaceWithColumnarPlan))

0 commit comments

Comments
 (0)