Skip to content

Commit f0bc357

Browse files
author
stczwd
committed
redefine AlterTableAddPartitionExec and AlterTableDropPartitionExec
Change-Id: I10d4ff8d86fb70f195efa21156eed03dd0a74a32
1 parent 6efca68 commit f0bc357

File tree

6 files changed

+88
-91
lines changed

6 files changed

+88
-91
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql.AnalysisException
2123
import org.apache.spark.sql.catalyst.plans.logical._
2224
import org.apache.spark.sql.catalyst.rules.Rule
@@ -30,6 +32,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3032
extends Rule[LogicalPlan] with LookupCatalog {
3133
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
3234
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
35+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
3336

3437
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3538
case AlterTableAddColumnsStatement(
@@ -229,23 +232,30 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
229232
case ShowCurrentNamespaceStatement() =>
230233
ShowCurrentNamespace(catalogManager)
231234

232-
case AlterTableAddPartitionStatement(
233-
NonSessionCatalogAndTable(catalog, tableName), partitionSpecsAndLocs, ifNotExists) =>
235+
case c @ AlterTableAddPartitionStatement(
236+
NonSessionCatalogAndTable(catalog, tableName), _, _) =>
237+
val table = catalog.asTableCatalog.loadTable(tableName.asIdentifier).asPartitionable
238+
val partitions = c.partitionSpecsAndLocs.map { case (spec, location) =>
239+
val tableProperties = table.properties().asScala.toMap
240+
val partParams =
241+
location.map(locationUri => tableProperties + ("location" -> locationUri))
242+
.getOrElse(tableProperties)
243+
(spec.asPartitionIdentifier(table.partitionSchema()), partParams)
244+
}
234245
AlterTableAddPartition(
235-
catalog.asTableCatalog,
236-
tableName.asIdentifier,
237-
partitionSpecsAndLocs,
238-
ifNotExists)
246+
table,
247+
partitions,
248+
c.ifNotExists)
239249

240-
case AlterTableDropPartitionStatement(
241-
NonSessionCatalogAndTable(catalog, tableName), specs, ifExists, purge, retainData) =>
250+
case c @ AlterTableDropPartitionStatement(
251+
NonSessionCatalogAndTable(catalog, tableName), _, _, _, _) =>
252+
val table = catalog.asTableCatalog.loadTable(tableName.asIdentifier).asPartitionable
242253
AlterTableDropPartition(
243-
catalog.asTableCatalog,
244-
tableName.asIdentifier,
245-
specs,
246-
ifExists,
247-
purge,
248-
retainData)
254+
table,
255+
c.specs.map(_.asPartitionIdentifier(table.partitionSchema())),
256+
c.ifExists,
257+
c.purge,
258+
c.retainData)
249259
}
250260

251261
object NonSessionCatalogAndTable {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.catalyst.InternalRow
2021
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException}
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
@@ -562,9 +563,8 @@ case class ShowFunctions(
562563
* }}}
563564
*/
564565
case class AlterTableAddPartition(
565-
catalog: TableCatalog,
566-
ident: Identifier,
567-
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
566+
table: SupportsPartitions,
567+
parts: Seq[(InternalRow, Map[String, String])],
568568
ignoreIfExists: Boolean) extends Command
569569

570570
/**
@@ -577,9 +577,8 @@ case class AlterTableAddPartition(
577577
* }}}
578578
*/
579579
case class AlterTableDropPartition(
580-
catalog: TableCatalog,
581-
ident: Identifier,
582-
specs: Seq[TablePartitionSpec],
580+
table: SupportsPartitions,
581+
partIdents: Seq[InternalRow],
583582
ignoreIfNotExists: Boolean,
584583
purge: Boolean,
585584
retainData: Boolean) extends Command

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,30 +75,41 @@ object DataSourceV2Implicits {
7575
}
7676
}
7777

78-
def convertPartitionIndentifers(
79-
partSpec: TablePartitionSpec,
80-
partSchema: StructType): InternalRow = {
81-
val partValues = partSchema.map { part =>
82-
part.dataType match {
83-
case _: ByteType =>
84-
partSpec.getOrElse(part.name, "0").toByte
85-
case _: ShortType =>
86-
partSpec.getOrElse(part.name, "0").toShort
87-
case _: IntegerType =>
88-
partSpec.getOrElse(part.name, "0").toInt
89-
case _: LongType =>
90-
partSpec.getOrElse(part.name, "0").toLong
91-
case _: FloatType =>
92-
partSpec.getOrElse(part.name, "0").toFloat
93-
case _: DoubleType =>
94-
partSpec.getOrElse(part.name, "0").toDouble
95-
case _: StringType =>
96-
partSpec.getOrElse(part.name, "")
97-
case _ =>
98-
throw new AnalysisException(
99-
s"Type ${part.dataType.typeName} is not supported for partition.")
78+
implicit class TablePartitionSpecHelper(partSpec: TablePartitionSpec) {
79+
def asPartitionIdentifier(partSchema: StructType): InternalRow = {
80+
val conflictKeys = partSpec.keys.toSeq.diff(partSchema.map(_.name))
81+
if (conflictKeys.nonEmpty) {
82+
throw new AnalysisException(
83+
s"Partition key ${conflictKeys.mkString(",")} not exists")
84+
}
85+
86+
val partValues = partSchema.map { part =>
87+
val partValue = partSpec.get(part.name).orNull
88+
if (partValue == null) {
89+
null
90+
} else {
91+
part.dataType match {
92+
case _: ByteType =>
93+
partValue.toByte
94+
case _: ShortType =>
95+
partValue.toShort
96+
case _: IntegerType =>
97+
partValue.toInt
98+
case _: LongType =>
99+
partValue.toLong
100+
case _: FloatType =>
101+
partValue.toFloat
102+
case _: DoubleType =>
103+
partValue.toDouble
104+
case _: StringType =>
105+
partValue
106+
case _ =>
107+
throw new AnalysisException(
108+
s"Type ${part.dataType.typeName} is not supported for partition.")
109+
}
110+
}
100111
}
112+
InternalRow.fromSeq(partValues)
101113
}
102-
InternalRow.fromSeq(partValues)
103114
}
104115
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableAddPartitionExec.scala

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,31 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.sql.AnalysisException
20+
import scala.collection.JavaConverters._
21+
2122
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
23+
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException
2324
import org.apache.spark.sql.catalyst.expressions.Attribute
24-
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, TableCatalog}
25+
import org.apache.spark.sql.connector.catalog.SupportsPartitions
2526

2627
/**
2728
* Physical plan node for adding partitions of table.
2829
*/
2930
case class AlterTableAddPartitionExec(
30-
catalog: TableCatalog,
31-
ident: Identifier,
32-
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
31+
table: SupportsPartitions,
32+
partitions: Seq[(InternalRow, Map[String, String])],
3333
ignoreIfExists: Boolean) extends V2CommandExec {
34-
import DataSourceV2Implicits._
35-
import CatalogV2Implicits._
3634

3735
override def output: Seq[Attribute] = Seq.empty
3836

3937
override protected def run(): Seq[InternalRow] = {
40-
val table = catalog.loadTable(ident).asPartitionable
41-
val partNames = table.partitionSchema().map(_.name)
42-
43-
partitionSpecsAndLocs.foreach { case (spec, location) =>
44-
val partParams = new java.util.HashMap[String, String](table.properties())
45-
location.foreach(locationUri =>
46-
partParams.put("location", locationUri))
47-
partParams.put("ignoreIfExists", ignoreIfExists.toString)
48-
49-
val conflictKeys = spec.keys.filterNot(partNames.contains)
50-
if (conflictKeys.nonEmpty) {
51-
throw new AnalysisException(
52-
s"Partition key ${conflictKeys.mkString(",")} " +
53-
s"not exists in ${ident.namespace().quoted}.${ident.name()}")
38+
partitions.foreach { case (partIdent, properties) =>
39+
try {
40+
table.createPartition(partIdent, properties.asJava)
41+
} catch {
42+
case e: PartitionAlreadyExistsException if ignoreIfExists =>
43+
logWarning(e.getMessage)
5444
}
55-
56-
val partIdent: InternalRow = convertPartitionIndentifers(spec, table.partitionSchema())
57-
table.createPartition(partIdent, partParams)
5845
}
5946
Seq.empty
6047
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,31 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.sql.AnalysisException
2120
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
21+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
2322
import org.apache.spark.sql.catalyst.expressions.Attribute
24-
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, TableCatalog}
23+
import org.apache.spark.sql.connector.catalog.SupportsPartitions
2524

2625
/**
2726
* Physical plan node for dropping partitions of table.
2827
*/
2928
case class AlterTableDropPartitionExec(
30-
catalog: TableCatalog,
31-
ident: Identifier,
32-
specs: Seq[TablePartitionSpec],
29+
table: SupportsPartitions,
30+
partIdents: Seq[InternalRow],
3331
ignoreIfNotExists: Boolean,
3432
purge: Boolean,
3533
retainData: Boolean) extends V2CommandExec {
36-
import DataSourceV2Implicits._
37-
import CatalogV2Implicits._
3834

3935
override def output: Seq[Attribute] = Seq.empty
4036

4137
override protected def run(): Seq[InternalRow] = {
42-
val table = catalog.loadTable(ident).asPartitionable
43-
val partNames = table.partitionSchema().map(_.name)
38+
table.properties().put("purge", purge.toString)
39+
table.properties().put("retainData", retainData.toString)
4440

45-
specs.foreach { partSpec =>
46-
val conflictKeys = partSpec.keys.filterNot(partNames.contains)
47-
if (conflictKeys.nonEmpty) {
48-
throw new AnalysisException(
49-
s"Partition key ${conflictKeys.mkString(",")} " +
50-
s"not exists in ${ident.namespace().quoted}.${ident.name()}")
41+
partIdents.foreach { partIdent =>
42+
if (!table.dropPartition(partIdent) && !ignoreIfNotExists) {
43+
throw new NoSuchPartitionException(table.name(), partIdent, table.partitionSchema())
5144
}
52-
53-
val partIdent: InternalRow = convertPartitionIndentifers(partSpec, table.partitionSchema())
54-
table.dropPartition(partIdent)
5545
}
5646
Seq.empty
5747
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
276276
case r @ ShowTableProperties(rt: ResolvedTable, propertyKey) =>
277277
ShowTablePropertiesExec(r.output, rt.table, propertyKey) :: Nil
278278

279-
case AlterTableAddPartition(catalog, ident, partitionSpecsAndLocs, ignoreIfExists) =>
280-
AlterTableAddPartitionExec(catalog, ident, partitionSpecsAndLocs, ignoreIfExists) :: Nil
279+
case AlterTableAddPartition(table, parts, ignoreIfExists) =>
280+
AlterTableAddPartitionExec(table, parts, ignoreIfExists) :: Nil
281281

282-
case AlterTableDropPartition(catalog, ident, specs, ignoreIfNotExists, purge, retainData) =>
282+
case AlterTableDropPartition(table, partIdents, ignoreIfNotExists, purge, retainData) =>
283283
AlterTableDropPartitionExec(
284-
catalog, ident, specs, ignoreIfNotExists, purge, retainData) :: Nil
284+
table, partIdents, ignoreIfNotExists, purge, retainData) :: Nil
285285

286286
case _ => Nil
287287
}

0 commit comments

Comments
 (0)