@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
4444import org .apache .spark .sql .catalyst .util .{toPrettySQL , CharVarcharUtils }
4545import org .apache .spark .sql .connector .catalog ._
4646import org .apache .spark .sql .connector .catalog .CatalogV2Implicits ._
47- import org .apache .spark .sql .connector .catalog .TableChange .{AddColumn , After , ColumnChange , ColumnPosition , DeleteColumn }
47+ import org .apache .spark .sql .connector .catalog .TableChange .{After , ColumnPosition }
4848import org .apache .spark .sql .connector .catalog .functions .{AggregateFunction => V2AggregateFunction , BoundFunction , ScalarFunction }
4949import org .apache .spark .sql .connector .catalog .functions .ScalarFunction .MAGIC_METHOD_NAME
5050import org .apache .spark .sql .connector .expressions .{FieldReference , IdentityTransform , Transform }
@@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
269269 ResolveRelations ::
270270 ResolveTables ::
271271 ResolvePartitionSpec ::
272- ResolveAlterTableCommands ::
272+ ResolveAlterTableColumnCommands ::
273273 AddMetadataColumns ::
274274 DeduplicateRelations ::
275275 ResolveReferences ::
@@ -312,7 +312,6 @@ class Analyzer(override val catalogManager: CatalogManager)
312312 Batch (" Post-Hoc Resolution" , Once ,
313313 Seq (ResolveCommandsWithIfExists ) ++
314314 postHocResolutionRules : _* ),
315- Batch (" Normalize Alter Table" , Once , ResolveAlterTableChanges ),
316315 Batch (" Remove Unresolved Hints" , Once ,
317316 new ResolveHints .RemoveAllHints ),
318317 Batch (" Nondeterministic" , Once ,
@@ -1082,11 +1081,6 @@ class Analyzer(override val catalogManager: CatalogManager)
10821081 case _ => write
10831082 }
10841083
1085- case alter @ AlterTable (_, _, u : UnresolvedV2Relation , _) =>
1086- CatalogV2Util .loadRelation(u.catalog, u.tableName)
1087- .map(rel => alter.copy(table = rel))
1088- .getOrElse(alter)
1089-
10901084 case u : UnresolvedV2Relation =>
10911085 CatalogV2Util .loadRelation(u.catalog, u.tableName).getOrElse(u)
10921086 }
@@ -3611,16 +3605,69 @@ class Analyzer(override val catalogManager: CatalogManager)
36113605
36123606 /**
36133607 * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
3614- * for alter table commands.
3608+ * for alter table column commands.
36153609 */
3616- object ResolveAlterTableCommands extends Rule [LogicalPlan ] {
3610+ object ResolveAlterTableColumnCommands extends Rule [LogicalPlan ] {
36173611 def apply (plan : LogicalPlan ): LogicalPlan = plan.resolveOperatorsUp {
3618- case a : AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
3612+ case a : AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
36193613 val table = a.table.asInstanceOf [ResolvedTable ]
36203614 a.transformExpressions {
36213615 case u : UnresolvedFieldName => resolveFieldNames(table, u.name, u)
36223616 }
36233617
3618+ case a @ AlterTableAddColumns (r : ResolvedTable , cols) if ! a.resolved =>
3619+ // 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
3620+ // normalized parent name of fields to field names that belong to the parent.
3621+ // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
3622+ // Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
3623+ val colsToAdd = mutable.Map .empty[Seq [String ], Seq [String ]]
3624+ def resolvePosition (
3625+ col : QualifiedColType ,
3626+ parentSchema : StructType ,
3627+ resolvedParentName : Seq [String ]): Option [FieldPosition ] = {
3628+ val fieldsAdded = colsToAdd.getOrElse(resolvedParentName, Nil )
3629+ val resolvedPosition = col.position.map {
3630+ case u : UnresolvedFieldPosition => u.position match {
3631+ case after : After =>
3632+ val allFields = parentSchema.fieldNames ++ fieldsAdded
3633+ allFields.find(n => conf.resolver(n, after.column())) match {
3634+ case Some (colName) =>
3635+ ResolvedFieldPosition (ColumnPosition .after(colName))
3636+ case None =>
3637+ val name = if (resolvedParentName.isEmpty) " root" else resolvedParentName.quoted
3638+ throw QueryCompilationErrors .referenceColNotFoundForAlterTableChangesError(
3639+ after, name)
3640+ }
3641+ case _ => ResolvedFieldPosition (u.position)
3642+ }
3643+ case resolved => resolved
3644+ }
3645+ colsToAdd(resolvedParentName) = fieldsAdded :+ col.colName
3646+ resolvedPosition
3647+ }
3648+ val schema = r.table.schema
3649+ val resolvedCols = cols.map { col =>
3650+ col.path match {
3651+ case Some (parent : UnresolvedFieldName ) =>
3652+ // Adding a nested field, need to resolve the parent column and position.
3653+ val resolvedParent = resolveFieldNames(r, parent.name, parent)
3654+ val parentSchema = resolvedParent.field.dataType match {
3655+ case s : StructType => s
3656+ case _ => throw QueryCompilationErrors .invalidFieldName(
3657+ col.name, parent.name, parent.origin)
3658+ }
3659+ val resolvedPosition = resolvePosition(col, parentSchema, resolvedParent.name)
3660+ col.copy(path = Some (resolvedParent), position = resolvedPosition)
3661+ case _ =>
3662+ // Adding to the root. Just need to resolve position.
3663+ val resolvedPosition = resolvePosition(col, schema, Nil )
3664+ col.copy(position = resolvedPosition)
3665+ }
3666+ }
3667+ val resolved = a.copy(columnsToAdd = resolvedCols)
3668+ resolved.copyTagsFrom(a)
3669+ resolved
3670+
36243671 case a @ AlterTableAlterColumn (
36253672 table : ResolvedTable , ResolvedFieldName (path, field), dataType, _, _, position) =>
36263673 val newDataType = dataType.flatMap { dt =>
@@ -3655,108 +3702,14 @@ class Analyzer(override val catalogManager: CatalogManager)
36553702 fieldName, includeCollections = true , conf.resolver, context.origin
36563703 ).map {
36573704 case (path, field) => ResolvedFieldName (path, field)
3658- }.getOrElse(throw QueryCompilationErrors .missingFieldError(fieldName, table, context))
3705+ }.getOrElse(throw QueryCompilationErrors .missingFieldError(fieldName, table, context.origin ))
36593706 }
36603707
3661- private def hasUnresolvedFieldName (a : AlterTableCommand ): Boolean = {
3708+ private def hasUnresolvedFieldName (a : AlterTableColumnCommand ): Boolean = {
36623709 a.expressions.exists(_.find(_.isInstanceOf [UnresolvedFieldName ]).isDefined)
36633710 }
36643711 }
36653712
3666- /** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */
3667- object ResolveAlterTableChanges extends Rule [LogicalPlan ] {
3668- def apply (plan : LogicalPlan ): LogicalPlan = plan.resolveOperatorsUp {
3669- case a @ AlterTable (_, _, t : NamedRelation , changes) if t.resolved =>
3670- // 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
3671- // normalized parent name of fields to field names that belong to the parent.
3672- // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
3673- // Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
3674- val colsToAdd = mutable.Map .empty[Seq [String ], Seq [String ]]
3675- val schema = t.schema
3676- val normalizedChanges = changes.flatMap {
3677- case add : AddColumn =>
3678- def addColumn (
3679- parentSchema : StructType ,
3680- parentName : String ,
3681- normalizedParentName : Seq [String ]): TableChange = {
3682- val fieldsAdded = colsToAdd.getOrElse(normalizedParentName, Nil )
3683- val pos = findColumnPosition(add.position(), parentName, parentSchema, fieldsAdded)
3684- val field = add.fieldNames().last
3685- colsToAdd(normalizedParentName) = fieldsAdded :+ field
3686- TableChange .addColumn(
3687- (normalizedParentName :+ field).toArray,
3688- add.dataType(),
3689- add.isNullable,
3690- add.comment,
3691- pos)
3692- }
3693- val parent = add.fieldNames().init
3694- if (parent.nonEmpty) {
3695- // Adding a nested field, need to normalize the parent column and position
3696- val target = schema.findNestedField(parent, includeCollections = true , conf.resolver)
3697- if (target.isEmpty) {
3698- // Leave unresolved. Throws error in CheckAnalysis
3699- Some (add)
3700- } else {
3701- val (normalizedName, sf) = target.get
3702- sf.dataType match {
3703- case struct : StructType =>
3704- Some (addColumn(struct, parent.quoted, normalizedName :+ sf.name))
3705- case other =>
3706- Some (add)
3707- }
3708- }
3709- } else {
3710- // Adding to the root. Just need to normalize position
3711- Some (addColumn(schema, " root" , Nil ))
3712- }
3713-
3714- case delete : DeleteColumn =>
3715- resolveFieldNames(schema, delete.fieldNames(), TableChange .deleteColumn)
3716- .orElse(Some (delete))
3717-
3718- case column : ColumnChange =>
3719- // This is informational for future developers
3720- throw QueryExecutionErrors .columnChangeUnsupportedError
3721- case other => Some (other)
3722- }
3723-
3724- a.copy(changes = normalizedChanges)
3725- }
3726-
3727- /**
3728- * Returns the table change if the field can be resolved, returns None if the column is not
3729- * found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
3730- */
3731- private def resolveFieldNames (
3732- schema : StructType ,
3733- fieldNames : Array [String ],
3734- copy : Array [String ] => TableChange ): Option [TableChange ] = {
3735- val fieldOpt = schema.findNestedField(
3736- fieldNames, includeCollections = true , conf.resolver)
3737- fieldOpt.map { case (path, field) => copy((path :+ field.name).toArray) }
3738- }
3739-
3740- private def findColumnPosition (
3741- position : ColumnPosition ,
3742- parentName : String ,
3743- struct : StructType ,
3744- fieldsAdded : Seq [String ]): ColumnPosition = {
3745- position match {
3746- case null => null
3747- case after : After =>
3748- (struct.fieldNames ++ fieldsAdded).find(n => conf.resolver(n, after.column())) match {
3749- case Some (colName) =>
3750- ColumnPosition .after(colName)
3751- case None =>
3752- throw QueryCompilationErrors .referenceColNotFoundForAlterTableChangesError(after,
3753- parentName)
3754- }
3755- case other => other
3756- }
3757- }
3758- }
3759-
37603713 /**
37613714 * A rule that marks a command as analyzed so that its children are removed to avoid
37623715 * being optimized. This rule should run after all other analysis rules are run.
0 commit comments