Skip to content

Commit 809b88a

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-36006][SQL] Migrate ALTER TABLE ... ADD/REPLACE COLUMNS commands to use UnresolvedTable to resolve the identifier
### What changes were proposed in this pull request? This PR proposes to migrate the following `ALTER TABLE ... ADD/REPLACE COLUMNS` commands to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing). ### Why are the changes needed? This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900). ### Does this PR introduce _any_ user-facing change? After this PR, the above `ALTER TABLE ... ADD/REPLACE COLUMNS` commands will have a consistent resolution behavior. ### How was this patch tested? Updated existing tests. Closes #33200 from imback82/alter_add_cols. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c9a7ff3 commit 809b88a

File tree

15 files changed

+335
-453
lines changed

15 files changed

+335
-453
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 60 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
4444
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
4545
import org.apache.spark.sql.connector.catalog._
4646
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
47-
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn}
47+
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
4848
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction}
4949
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
5050
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
@@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
269269
ResolveRelations ::
270270
ResolveTables ::
271271
ResolvePartitionSpec ::
272-
ResolveAlterTableCommands ::
272+
ResolveAlterTableColumnCommands ::
273273
AddMetadataColumns ::
274274
DeduplicateRelations ::
275275
ResolveReferences ::
@@ -312,7 +312,6 @@ class Analyzer(override val catalogManager: CatalogManager)
312312
Batch("Post-Hoc Resolution", Once,
313313
Seq(ResolveCommandsWithIfExists) ++
314314
postHocResolutionRules: _*),
315-
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
316315
Batch("Remove Unresolved Hints", Once,
317316
new ResolveHints.RemoveAllHints),
318317
Batch("Nondeterministic", Once,
@@ -1082,11 +1081,6 @@ class Analyzer(override val catalogManager: CatalogManager)
10821081
case _ => write
10831082
}
10841083

1085-
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
1086-
CatalogV2Util.loadRelation(u.catalog, u.tableName)
1087-
.map(rel => alter.copy(table = rel))
1088-
.getOrElse(alter)
1089-
10901084
case u: UnresolvedV2Relation =>
10911085
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
10921086
}
@@ -3611,16 +3605,69 @@ class Analyzer(override val catalogManager: CatalogManager)
36113605

36123606
/**
36133607
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
3614-
* for alter table commands.
3608+
* for alter table column commands.
36153609
*/
3616-
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
3610+
object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
36173611
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
3618-
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
3612+
case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
36193613
val table = a.table.asInstanceOf[ResolvedTable]
36203614
a.transformExpressions {
36213615
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
36223616
}
36233617

3618+
case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
3619+
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
3620+
// normalized parent name of fields to field names that belong to the parent.
3621+
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
3622+
// Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
3623+
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
3624+
def resolvePosition(
3625+
col: QualifiedColType,
3626+
parentSchema: StructType,
3627+
resolvedParentName: Seq[String]): Option[FieldPosition] = {
3628+
val fieldsAdded = colsToAdd.getOrElse(resolvedParentName, Nil)
3629+
val resolvedPosition = col.position.map {
3630+
case u: UnresolvedFieldPosition => u.position match {
3631+
case after: After =>
3632+
val allFields = parentSchema.fieldNames ++ fieldsAdded
3633+
allFields.find(n => conf.resolver(n, after.column())) match {
3634+
case Some(colName) =>
3635+
ResolvedFieldPosition(ColumnPosition.after(colName))
3636+
case None =>
3637+
val name = if (resolvedParentName.isEmpty) "root" else resolvedParentName.quoted
3638+
throw QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(
3639+
after, name)
3640+
}
3641+
case _ => ResolvedFieldPosition(u.position)
3642+
}
3643+
case resolved => resolved
3644+
}
3645+
colsToAdd(resolvedParentName) = fieldsAdded :+ col.colName
3646+
resolvedPosition
3647+
}
3648+
val schema = r.table.schema
3649+
val resolvedCols = cols.map { col =>
3650+
col.path match {
3651+
case Some(parent: UnresolvedFieldName) =>
3652+
// Adding a nested field, need to resolve the parent column and position.
3653+
val resolvedParent = resolveFieldNames(r, parent.name, parent)
3654+
val parentSchema = resolvedParent.field.dataType match {
3655+
case s: StructType => s
3656+
case _ => throw QueryCompilationErrors.invalidFieldName(
3657+
col.name, parent.name, parent.origin)
3658+
}
3659+
val resolvedPosition = resolvePosition(col, parentSchema, resolvedParent.name)
3660+
col.copy(path = Some(resolvedParent), position = resolvedPosition)
3661+
case _ =>
3662+
// Adding to the root. Just need to resolve position.
3663+
val resolvedPosition = resolvePosition(col, schema, Nil)
3664+
col.copy(position = resolvedPosition)
3665+
}
3666+
}
3667+
val resolved = a.copy(columnsToAdd = resolvedCols)
3668+
resolved.copyTagsFrom(a)
3669+
resolved
3670+
36243671
case a @ AlterTableAlterColumn(
36253672
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
36263673
val newDataType = dataType.flatMap { dt =>
@@ -3655,108 +3702,14 @@ class Analyzer(override val catalogManager: CatalogManager)
36553702
fieldName, includeCollections = true, conf.resolver, context.origin
36563703
).map {
36573704
case (path, field) => ResolvedFieldName(path, field)
3658-
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context))
3705+
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
36593706
}
36603707

3661-
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
3708+
private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
36623709
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
36633710
}
36643711
}
36653712

3666-
/** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */
3667-
object ResolveAlterTableChanges extends Rule[LogicalPlan] {
3668-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
3669-
case a @ AlterTable(_, _, t: NamedRelation, changes) if t.resolved =>
3670-
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
3671-
// normalized parent name of fields to field names that belong to the parent.
3672-
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
3673-
// Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
3674-
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
3675-
val schema = t.schema
3676-
val normalizedChanges = changes.flatMap {
3677-
case add: AddColumn =>
3678-
def addColumn(
3679-
parentSchema: StructType,
3680-
parentName: String,
3681-
normalizedParentName: Seq[String]): TableChange = {
3682-
val fieldsAdded = colsToAdd.getOrElse(normalizedParentName, Nil)
3683-
val pos = findColumnPosition(add.position(), parentName, parentSchema, fieldsAdded)
3684-
val field = add.fieldNames().last
3685-
colsToAdd(normalizedParentName) = fieldsAdded :+ field
3686-
TableChange.addColumn(
3687-
(normalizedParentName :+ field).toArray,
3688-
add.dataType(),
3689-
add.isNullable,
3690-
add.comment,
3691-
pos)
3692-
}
3693-
val parent = add.fieldNames().init
3694-
if (parent.nonEmpty) {
3695-
// Adding a nested field, need to normalize the parent column and position
3696-
val target = schema.findNestedField(parent, includeCollections = true, conf.resolver)
3697-
if (target.isEmpty) {
3698-
// Leave unresolved. Throws error in CheckAnalysis
3699-
Some(add)
3700-
} else {
3701-
val (normalizedName, sf) = target.get
3702-
sf.dataType match {
3703-
case struct: StructType =>
3704-
Some(addColumn(struct, parent.quoted, normalizedName :+ sf.name))
3705-
case other =>
3706-
Some(add)
3707-
}
3708-
}
3709-
} else {
3710-
// Adding to the root. Just need to normalize position
3711-
Some(addColumn(schema, "root", Nil))
3712-
}
3713-
3714-
case delete: DeleteColumn =>
3715-
resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn)
3716-
.orElse(Some(delete))
3717-
3718-
case column: ColumnChange =>
3719-
// This is informational for future developers
3720-
throw QueryExecutionErrors.columnChangeUnsupportedError
3721-
case other => Some(other)
3722-
}
3723-
3724-
a.copy(changes = normalizedChanges)
3725-
}
3726-
3727-
/**
3728-
* Returns the table change if the field can be resolved, returns None if the column is not
3729-
* found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
3730-
*/
3731-
private def resolveFieldNames(
3732-
schema: StructType,
3733-
fieldNames: Array[String],
3734-
copy: Array[String] => TableChange): Option[TableChange] = {
3735-
val fieldOpt = schema.findNestedField(
3736-
fieldNames, includeCollections = true, conf.resolver)
3737-
fieldOpt.map { case (path, field) => copy((path :+ field.name).toArray) }
3738-
}
3739-
3740-
private def findColumnPosition(
3741-
position: ColumnPosition,
3742-
parentName: String,
3743-
struct: StructType,
3744-
fieldsAdded: Seq[String]): ColumnPosition = {
3745-
position match {
3746-
case null => null
3747-
case after: After =>
3748-
(struct.fieldNames ++ fieldsAdded).find(n => conf.resolver(n, after.column())) match {
3749-
case Some(colName) =>
3750-
ColumnPosition.after(colName)
3751-
case None =>
3752-
throw QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(after,
3753-
parentName)
3754-
}
3755-
case other => other
3756-
}
3757-
}
3758-
}
3759-
37603713
/**
37613714
* A rule that marks a command as analyzed so that its children are removed to avoid
37623715
* being optimized. This rule should run after all other analysis rules are run.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 12 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
2929
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
30-
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn}
3130
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3231
import org.apache.spark.sql.internal.SQLConf
3332
import org.apache.spark.sql.types._
@@ -140,13 +139,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
140139
case u: UnresolvedV2Relation =>
141140
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
142141

143-
case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
144-
u.failAnalysis(
145-
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
146-
147-
case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
148-
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
149-
150142
case command: V2PartitionCommand =>
151143
command.table match {
152144
case r @ ResolvedTable(_, _, table, _) => table match {
@@ -449,87 +441,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
449441
case write: V2WriteCommand if write.resolved =>
450442
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
451443

452-
case alter: AlterTableCommand if alter.table.resolved =>
453-
checkAlterTableCommand(alter)
454-
455-
case alter: AlterTable if alter.table.resolved =>
456-
val table = alter.table
457-
def findField(operation: String, fieldName: Array[String]): StructField = {
458-
// include collections because structs nested in maps and arrays may be altered
459-
val field = table.schema.findNestedField(fieldName, includeCollections = true)
460-
if (field.isEmpty) {
461-
alter.failAnalysis(
462-
s"Cannot $operation missing field ${fieldName.quoted} in ${table.name} schema: " +
463-
table.schema.treeString)
464-
}
465-
field.get._2
466-
}
467-
def positionArgumentExists(
468-
position: ColumnPosition,
469-
struct: StructType,
470-
fieldsAdded: Seq[String]): Unit = {
471-
position match {
472-
case after: After =>
473-
val allFields = struct.fieldNames ++ fieldsAdded
474-
if (!allFields.contains(after.column())) {
475-
alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " +
476-
s"${allFields.mkString("[", ", ", "]")}")
477-
}
478-
case _ =>
479-
}
480-
}
481-
def findParentStruct(operation: String, fieldNames: Array[String]): StructType = {
482-
val parent = fieldNames.init
483-
val field = if (parent.nonEmpty) {
484-
findField(operation, parent).dataType
485-
} else {
486-
table.schema
487-
}
488-
field match {
489-
case s: StructType => s
490-
case o => alter.failAnalysis(s"Cannot $operation ${fieldNames.quoted}, because " +
491-
s"its parent is not a StructType. Found $o")
492-
}
493-
}
494-
def checkColumnNotExists(
495-
operation: String,
496-
fieldNames: Array[String],
497-
struct: StructType): Unit = {
498-
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
499-
alter.failAnalysis(s"Cannot $operation column, because ${fieldNames.quoted} " +
500-
s"already exists in ${struct.treeString}")
501-
}
502-
}
503-
504-
val colsToDelete = mutable.Set.empty[Seq[String]]
505-
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a parent
506-
// name of fields to field names that belong to the parent. For example, if we add
507-
// columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
508-
// Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
509-
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
510-
511-
alter.changes.foreach {
512-
case add: AddColumn =>
513-
// If a column to add is a part of columns to delete, we don't need to check
514-
// if column already exists - applies to REPLACE COLUMNS scenario.
515-
if (!colsToDelete.contains(add.fieldNames())) {
516-
checkColumnNotExists("add", add.fieldNames(), table.schema)
517-
}
518-
val parent = findParentStruct("add", add.fieldNames())
519-
val parentName = add.fieldNames().init
520-
val fieldsAdded = colsToAdd.getOrElse(parentName, Nil)
521-
positionArgumentExists(add.position(), parent, fieldsAdded)
522-
TypeUtils.failWithIntervalType(add.dataType())
523-
colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last
524-
case delete: DeleteColumn =>
525-
findField("delete", delete.fieldNames)
526-
// REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns
527-
// so that add operations do not fail when the columns to add exist and they
528-
// are to be deleted.
529-
colsToDelete += delete.fieldNames
530-
case _ =>
531-
// no validation needed for set and remove property
532-
}
444+
case alter: AlterTableColumnCommand if alter.table.resolved =>
445+
checkAlterTableColumnCommand(alter)
533446

534447
case _ => // Falls back to the following checks
535448
}
@@ -1025,17 +938,23 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
1025938
/**
1026939
* Validates the options used for alter table commands after table and columns are resolved.
1027940
*/
1028-
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
1029-
def checkColumnNotExists(fieldNames: Seq[String], struct: StructType): Unit = {
941+
private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
942+
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
1030943
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
1031-
alter.failAnalysis(s"Cannot ${alter.operation} column, because ${fieldNames.quoted} " +
944+
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
1032945
s"already exists in ${struct.treeString}")
1033946
}
1034947
}
1035948

1036949
alter match {
950+
case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
951+
colsToAdd.foreach { colToAdd =>
952+
checkColumnNotExists("add", colToAdd.name, table.schema)
953+
}
954+
1037955
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
1038-
checkColumnNotExists(col.path :+ newName, table.schema)
956+
checkColumnNotExists("rename", col.path :+ newName, table.schema)
957+
1039958
case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
1040959
val fieldName = col.name.quoted
1041960
if (a.dataType.isDefined) {

0 commit comments

Comments
 (0)