Skip to content

Commit 84241e0

Browse files
aokolnychyirdblue
andcommitted
[SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer
Lead-authored-by: Anton Okolnychyi <[email protected]> Co-authored-by: Ryan Blue <[email protected]>
1 parent 69aa727 commit 84241e0

File tree

13 files changed

+221
-191
lines changed

13 files changed

+221
-191
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public enum TableCapability {
9696
/**
9797
* Signals that the table supports append writes using the V1 InsertableRelation interface.
9898
* <p>
99-
* Tables that return this capability must create a V1WriteBuilder and may also support additional
99+
* Tables that return this capability must create a V1Write and may also support additional
100100
* write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support
101101
* {@link #OVERWRITE_DYNAMIC}.
102102
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2525
import org.apache.spark.sql.connector.catalog._
2626
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
2727
import org.apache.spark.sql.connector.expressions.Transform
28+
import org.apache.spark.sql.connector.write.Write
2829
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
2930

3031
/**
@@ -65,7 +66,8 @@ case class AppendData(
6566
table: NamedRelation,
6667
query: LogicalPlan,
6768
writeOptions: Map[String, String],
68-
isByName: Boolean) extends V2WriteCommand {
69+
isByName: Boolean,
70+
write: Option[Write] = None) extends V2WriteCommand {
6971
override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery)
7072
override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable)
7173
}
@@ -94,7 +96,8 @@ case class OverwriteByExpression(
9496
deleteExpr: Expression,
9597
query: LogicalPlan,
9698
writeOptions: Map[String, String],
97-
isByName: Boolean) extends V2WriteCommand {
99+
isByName: Boolean,
100+
write: Option[Write] = None) extends V2WriteCommand {
98101
override lazy val resolved: Boolean = {
99102
table.resolved && query.resolved && outputResolved && deleteExpr.resolved
100103
}
@@ -132,7 +135,8 @@ case class OverwritePartitionsDynamic(
132135
table: NamedRelation,
133136
query: LogicalPlan,
134137
writeOptions: Map[String, String],
135-
isByName: Boolean) extends V2WriteCommand {
138+
isByName: Boolean,
139+
write: Option[Write] = None) extends V2WriteCommand {
136140
override def withNewQuery(newQuery: LogicalPlan): OverwritePartitionsDynamic = {
137141
copy(query = newQuery)
138142
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Unstable;
21+
import org.apache.spark.sql.connector.catalog.TableCapability;
22+
import org.apache.spark.sql.sources.InsertableRelation;
23+
24+
/**
25+
* A logical write that should be executed using V1 InsertableRelation interface.
26+
* <p>
27+
* Tables that have {@link TableCapability#V1_BATCH_WRITE} in the list of their capabilities
28+
* must build {@link V1Write}.
29+
*/
30+
@Unstable
31+
public interface V1Write extends Write {
32+
InsertableRelation toInsertableRelation();
33+
}

sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.CatalogManager
2626
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
2727
import org.apache.spark.sql.execution.datasources.SchemaPruning
28-
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
28+
import org.apache.spark.sql.execution.datasources.v2.{V2ScanRelationPushDown, V2Writes}
2929
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
3030
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
3131

@@ -37,7 +37,7 @@ class SparkOptimizer(
3737

3838
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
3939
// TODO: move SchemaPruning into catalyst
40-
SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil
40+
SchemaPruning :: V2ScanRelationPushDown :: V2Writes :: PruneFileSourcePartitions :: Nil
4141

4242
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
4343
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
@@ -70,7 +70,8 @@ class SparkOptimizer(
7070
ExtractPythonUDFFromJoinCondition.ruleName :+
7171
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+
7272
ExtractPythonUDFs.ruleName :+
73-
V2ScanRelationPushDown.ruleName
73+
V2ScanRelationPushDown.ruleName :+
74+
V2Writes.ruleName
7475

7576
/**
7677
* Optimization batches that are executed before the regular optimization batches (also before

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit
2424
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2626
import org.apache.spark.sql.catalyst.plans.logical._
27-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange}
27+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TableChange}
2828
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
29+
import org.apache.spark.sql.connector.write.V1Write
2930
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
3031
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
3132
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -195,33 +196,42 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
195196
orCreate = orCreate) :: Nil
196197
}
197198

198-
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
199-
r.table.asWritable match {
200-
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
201-
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil
202-
case v2 =>
203-
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
199+
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,
200+
_, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
201+
write match {
202+
case v1Write: V1Write =>
203+
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil
204+
case v2Write =>
205+
throw new AnalysisException(
206+
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
207+
s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}")
204208
}
205209

206-
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
207-
// fail if any filter cannot be converted. correctness depends on removing all matching data.
208-
val filters = splitConjunctivePredicates(deleteExpr).map {
209-
filter => DataSourceStrategy.translateFilter(deleteExpr,
210-
supportNestedPredicatePushdown = true).getOrElse(
211-
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
212-
}.toArray
213-
r.table.asWritable match {
214-
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
215-
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions,
216-
query, refreshCache(r)) :: Nil
217-
case v2 =>
218-
OverwriteByExpressionExec(v2, filters,
219-
writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
210+
case AppendData(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), query, writeOptions,
211+
_, Some(write)) =>
212+
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil
213+
214+
case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, query,
215+
writeOptions, _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
216+
write match {
217+
case v1Write: V1Write =>
218+
OverwriteByExpressionExecV1(
219+
v1, writeOptions.asOptions, query, refreshCache(r), v1Write) :: Nil
220+
case v2Write =>
221+
throw new AnalysisException(
222+
s"Table ${v1.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
223+
s"${v2Write.getClass} is not an instance of ${classOf[V1Write]}")
220224
}
221225

222-
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
226+
case OverwriteByExpression(r @ DataSourceV2Relation(v2: SupportsWrite, _, _, _, _), _, query,
227+
writeOptions, _, Some(write)) =>
228+
OverwriteByExpressionExec(
229+
v2, writeOptions.asOptions, planLater(query), refreshCache(r), write) :: Nil
230+
231+
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _, Some(write)) =>
223232
OverwritePartitionsDynamicExec(
224-
r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
233+
r.table.asWritable, writeOptions.asOptions, planLater(query),
234+
refreshCache(r), write) :: Nil
225235

226236
case DeleteFromTable(relation, condition) =>
227237
relation match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
4949

5050
// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
5151
// a logical plan for streaming write.
52-
case AppendData(r: DataSourceV2Relation, _, _, _) if !supportsBatchWrite(r.table) =>
52+
case AppendData(r: DataSourceV2Relation, _, _, _, _) if !supportsBatchWrite(r.table) =>
5353
failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.")
5454

55-
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _)
55+
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
5656
if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) =>
5757
failAnalysis(s"Table ${r.table.name()} does not support dynamic overwrite in batch mode.")
5858

59-
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _) =>
59+
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _, _) =>
6060
expr match {
6161
case Literal(true, BooleanType) =>
6262
if (!supportsBatchWrite(r.table) ||

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala

Lines changed: 13 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import java.util.UUID
21-
22-
import org.apache.spark.SparkException
2320
import org.apache.spark.sql.Dataset
2421
import org.apache.spark.sql.catalyst.InternalRow
2522
import org.apache.spark.sql.catalyst.expressions.Attribute
2623
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2724
import org.apache.spark.sql.connector.catalog.SupportsWrite
28-
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
25+
import org.apache.spark.sql.connector.write.V1Write
2926
import org.apache.spark.sql.execution.SparkPlan
30-
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
27+
import org.apache.spark.sql.sources.InsertableRelation
3128
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3229

3330
/**
@@ -39,12 +36,8 @@ case class AppendDataExecV1(
3936
table: SupportsWrite,
4037
writeOptions: CaseInsensitiveStringMap,
4138
plan: LogicalPlan,
42-
refreshCache: () => Unit) extends V1FallbackWriters {
43-
44-
override protected def run(): Seq[InternalRow] = {
45-
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache)
46-
}
47-
}
39+
refreshCache: () => Unit,
40+
write: V1Write) extends V1FallbackWriters
4841

4942
/**
5043
* Physical plan node for overwrite into a v2 table with V1 write interfaces. Note that when this
@@ -59,29 +52,10 @@ case class AppendDataExecV1(
5952
*/
6053
case class OverwriteByExpressionExecV1(
6154
table: SupportsWrite,
62-
deleteWhere: Array[Filter],
6355
writeOptions: CaseInsensitiveStringMap,
6456
plan: LogicalPlan,
65-
refreshCache: () => Unit) extends V1FallbackWriters {
66-
67-
private def isTruncate(filters: Array[Filter]): Boolean = {
68-
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
69-
}
70-
71-
override protected def run(): Seq[InternalRow] = {
72-
newWriteBuilder() match {
73-
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
74-
writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache)
75-
76-
case builder: SupportsOverwrite =>
77-
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(),
78-
refreshCache = refreshCache)
79-
80-
case _ =>
81-
throw new SparkException(s"Table does not support overwrite by expression: $table")
82-
}
83-
}
84-
}
57+
refreshCache: () => Unit,
58+
write: V1Write) extends V1FallbackWriters
8559

8660
/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
8761
sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
@@ -90,23 +64,13 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
9064

9165
def table: SupportsWrite
9266
def writeOptions: CaseInsensitiveStringMap
67+
def refreshCache: () => Unit
68+
def write: V1Write
9369

94-
protected implicit class toV1WriteBuilder(builder: WriteBuilder) {
95-
def asV1Builder: V1WriteBuilder = builder match {
96-
case v1: V1WriteBuilder => v1
97-
case other => throw new IllegalStateException(
98-
s"The returned writer ${other} was no longer a V1WriteBuilder.")
99-
}
100-
}
101-
102-
protected def newWriteBuilder(): V1WriteBuilder = {
103-
val info = LogicalWriteInfoImpl(
104-
queryId = UUID.randomUUID().toString,
105-
schema = plan.schema,
106-
options = writeOptions)
107-
val writeBuilder = table.newWriteBuilder(info)
108-
109-
writeBuilder.asV1Builder
70+
override def run(): Seq[InternalRow] = {
71+
val writtenRows = writeWithV1(write.toInsertableRelation)
72+
refreshCache()
73+
writtenRows
11074
}
11175
}
11276

@@ -116,12 +80,8 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
11680
trait SupportsV1Write extends SparkPlan {
11781
def plan: LogicalPlan
11882

119-
protected def writeWithV1(
120-
relation: InsertableRelation,
121-
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
83+
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
12284
relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
123-
refreshCache()
124-
12585
Nil
12686
}
12787
}

0 commit comments

Comments
 (0)