Skip to content

Commit b781ef8

Browse files
committed
use StructType in CatalogTable and remove CatalogColumn
1 parent 762366f commit b781ef8

File tree

17 files changed

+114
-175
lines changed

17 files changed

+114
-175
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,14 +259,7 @@ class SessionCatalog(
259259
identifier = tid,
260260
tableType = CatalogTableType.VIEW,
261261
storage = CatalogStorageFormat.empty,
262-
schema = tempTables(table).output.map { c =>
263-
CatalogColumn(
264-
name = c.name,
265-
dataType = c.dataType.catalogString,
266-
nullable = c.nullable,
267-
comment = Option(c.name)
268-
)
269-
},
262+
schema = tempTables(table).output.toStructType,
270263
properties = Map(),
271264
viewText = None)
272265
} else {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
package org.apache.spark.sql.catalyst.catalog
1919

2020
import java.util.Date
21-
import javax.annotation.Nullable
2221

2322
import org.apache.spark.sql.AnalysisException
2423
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2524
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2625
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2726
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
2827
import org.apache.spark.sql.catalyst.util.quoteIdentifier
28+
import org.apache.spark.sql.types.StructType
2929

3030

3131
/**
@@ -77,28 +77,6 @@ object CatalogStorageFormat {
7777
outputFormat = None, serde = None, compressed = false, properties = Map.empty)
7878
}
7979

80-
/**
81-
* A column in a table.
82-
*/
83-
case class CatalogColumn(
84-
name: String,
85-
// TODO: make this type-safe; this is left as a string due to issues in converting Hive
86-
// varchars to and from SparkSQL strings.
87-
dataType: String,
88-
nullable: Boolean = true,
89-
comment: Option[String] = None) {
90-
91-
override def toString: String = {
92-
val output =
93-
Seq(s"`$name`",
94-
dataType,
95-
if (!nullable) "NOT NULL" else "",
96-
comment.map("(" + _ + ")").getOrElse(""))
97-
output.filter(_.nonEmpty).mkString(" ")
98-
}
99-
100-
}
101-
10280
/**
10381
* A partition (Hive style) defined in the catalog.
10482
*
@@ -141,7 +119,7 @@ case class CatalogTable(
141119
identifier: TableIdentifier,
142120
tableType: CatalogTableType,
143121
storage: CatalogStorageFormat,
144-
schema: Seq[CatalogColumn],
122+
schema: StructType,
145123
partitionColumnNames: Seq[String] = Seq.empty,
146124
bucketSpec: Option[BucketSpec] = None,
147125
owner: String = "",
@@ -163,9 +141,10 @@ case class CatalogTable(
163141
requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
164142
requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
165143

166-
/** Columns this table is partitioned by. */
167-
def partitionColumns: Seq[CatalogColumn] =
168-
schema.filter { c => partitionColumnNames.contains(c.name) }
144+
/** schema of this table's partition columns */
145+
def partitionSchema: StructType = StructType(schema.filter {
146+
c => partitionColumnNames.contains(c.name)
147+
})
169148

170149
/** Return the database this table was specified to belong to, assuming it exists. */
171150
def database: String = identifier.database.getOrElse {
@@ -277,16 +256,13 @@ case class SimpleCatalogRelation(
277256
override lazy val resolved: Boolean = false
278257

279258
override val output: Seq[Attribute] = {
280-
val cols = catalogTable.schema
281-
.filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
282-
(cols ++ catalogTable.partitionColumns).map { f =>
283-
AttributeReference(
284-
f.name,
285-
CatalystSqlParser.parseDataType(f.dataType),
286-
// Since data can be dumped in randomly with no validation, everything is nullable.
287-
nullable = true
288-
)(qualifier = Some(metadata.identifier.table))
289-
}
259+
val (partCols, dataCols) = metadata.schema.toAttributes
260+
// Since data can be dumped in randomly with no validation, everything is nullable.
261+
.map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
262+
.partition { a =>
263+
metadata.partitionColumnNames.contains(a.name)
264+
}
265+
dataCols ++ partCols
290266
}
291267

292268
require(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
2525
import org.apache.spark.SparkFunSuite
2626
import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
28+
import org.apache.spark.sql.types.StructType
2829
import org.apache.spark.util.Utils
2930

3031

@@ -551,7 +552,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
551552
identifier = TableIdentifier("my_table", Some("db1")),
552553
tableType = CatalogTableType.MANAGED,
553554
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
554-
schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
555+
schema = new StructType().add("a", "int").add("b", "string")
555556
)
556557

557558
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -570,7 +571,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
570571
storage = CatalogStorageFormat(
571572
Some(Utils.createTempDir().getAbsolutePath),
572573
None, None, None, false, Map.empty),
573-
schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
574+
schema = new StructType().add("a", "int").add("b", "string")
574575
)
575576
catalog.createTable("db1", externalTable, ignoreIfExists = false)
576577
assert(!exists(db.locationUri, "external_table"))
@@ -583,11 +584,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
583584
identifier = TableIdentifier("tbl", Some("db1")),
584585
tableType = CatalogTableType.MANAGED,
585586
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
586-
schema = Seq(
587-
CatalogColumn("col1", "int"),
588-
CatalogColumn("col2", "string"),
589-
CatalogColumn("a", "int"),
590-
CatalogColumn("b", "string")),
587+
schema = new StructType()
588+
.add("col1", "int")
589+
.add("col2", "string")
590+
.add("a", "int")
591+
.add("b", "string"),
591592
partitionColumnNames = Seq("a", "b")
592593
)
593594
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -686,11 +687,11 @@ abstract class CatalogTestUtils {
686687
identifier = TableIdentifier(name, database),
687688
tableType = CatalogTableType.EXTERNAL,
688689
storage = storageFormat,
689-
schema = Seq(
690-
CatalogColumn("col1", "int"),
691-
CatalogColumn("col2", "string"),
692-
CatalogColumn("a", "int"),
693-
CatalogColumn("b", "string")),
690+
schema = new StructType()
691+
.add("col1", "int")
692+
.add("col2", "string")
693+
.add("a", "int")
694+
.add("b", "string"),
694695
partitionColumnNames = Seq("a", "b"),
695696
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
696697
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation,
3232
import org.apache.spark.sql.execution.command._
3333
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
3434
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
35-
import org.apache.spark.sql.types.DataType
35+
import org.apache.spark.sql.types.{DataType, StructType}
3636

3737
/**
3838
* Concrete parser for Spark SQL statements.
@@ -928,13 +928,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
928928
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
929929
}
930930
val comment = Option(ctx.STRING).map(string)
931-
val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
932-
val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
931+
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
932+
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
933933
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
934934
val selectQuery = Option(ctx.query).map(plan)
935935

936936
// Ensuring whether no duplicate name is used in table definition
937-
val colNames = cols.map(_.name)
937+
val colNames = dataCols.map(_.name)
938938
if (colNames.length != colNames.distinct.length) {
939939
val duplicateColumns = colNames.groupBy(identity).collect {
940940
case (x, ys) if ys.length > 1 => "\"" + x + "\""
@@ -952,7 +952,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
952952

953953
// Note: Hive requires partition columns to be distinct from the schema, so we need
954954
// to include the partition columns here explicitly
955-
val schema = cols ++ partitionCols
955+
val schema = StructType(dataCols ++ partitionCols)
956956

957957
// Storage format
958958
val defaultStorage: CatalogStorageFormat = {
@@ -1296,23 +1296,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
12961296
isTemporary = isTemporary)
12971297
}
12981298

1299-
/**
1300-
* Create a sequence of [[CatalogColumn]]s from a column list
1301-
*/
1302-
private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
1303-
ctx.colType.asScala.map { col =>
1304-
CatalogColumn(
1305-
col.identifier.getText.toLowerCase,
1306-
// Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
1307-
// just convert the whole type string to lower case, otherwise the struct field names
1308-
// will no longer be case sensitive. Instead, we rely on our parser to get the proper
1309-
// case before passing it to Hive.
1310-
typedVisit[DataType](col.dataType).catalogString,
1311-
nullable = true,
1312-
Option(col.STRING).map(string))
1313-
}
1314-
}
1315-
13161299
/**
13171300
* Create a [[ScriptInputOutputSchema]].
13181301
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ object CreateDataSourceTableUtils extends Logging {
395395
CatalogTable(
396396
identifier = tableIdent,
397397
tableType = tableType,
398-
schema = Nil,
398+
schema = new StructType,
399399
storage = CatalogStorageFormat(
400400
locationUri = None,
401401
inputFormat = None,
@@ -424,9 +424,7 @@ object CreateDataSourceTableUtils extends Logging {
424424
compressed = false,
425425
properties = options
426426
),
427-
schema = relation.schema.map { f =>
428-
CatalogColumn(f.name, f.dataType.catalogString)
429-
},
427+
schema = relation.schema,
430428
properties = tableProperties.toMap,
431429
viewText = None)
432430
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ object DDLUtils {
518518
}
519519

520520
def isTablePartitioned(table: CatalogTable): Boolean = {
521-
table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
521+
table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
522522
}
523523

524524
// A persisted data source table always store its schema in the catalog.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
2929

3030
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3131
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
32+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
3333
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -439,10 +439,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
439439
describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
440440
}
441441
} else {
442-
if (table.partitionColumns.nonEmpty) {
442+
if (table.partitionColumnNames.nonEmpty) {
443443
append(buffer, "# Partition Information", "", "")
444444
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
445-
describeSchema(table.partitionColumns, buffer)
445+
describeSchema(table.partitionSchema, buffer)
446446
}
447447
}
448448
}
@@ -521,12 +521,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
521521
}
522522
}
523523

524-
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
525-
schema.foreach { column =>
526-
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
527-
}
528-
}
529-
530524
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
531525
schema.foreach { column =>
532526
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull)
@@ -701,7 +695,7 @@ case class ShowPartitionsCommand(
701695
* thrown if the partitioning spec is invalid.
702696
*/
703697
if (spec.isDefined) {
704-
val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
698+
val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
705699
if (badColumns.nonEmpty) {
706700
val badCols = badColumns.mkString("[", ", ", "]")
707701
throw new AnalysisException(
@@ -799,14 +793,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
799793
.foreach(builder.append)
800794
}
801795

802-
private def columnToDDLFragment(column: CatalogColumn): String = {
803-
val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
804-
s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}"
796+
private def columnToDDLFragment(column: StructField): String = {
797+
val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
798+
s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}"
805799
}
806800

807801
private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
808-
if (metadata.partitionColumns.nonEmpty) {
809-
val partCols = metadata.partitionColumns.map(columnToDDLFragment)
802+
if (metadata.partitionColumnNames.nonEmpty) {
803+
val partCols = metadata.partitionSchema.map(columnToDDLFragment)
810804
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
811805
}
812806

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2323
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
24-
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
2525
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
2626
import org.apache.spark.sql.catalyst.plans.QueryPlan
2727
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
28+
import org.apache.spark.sql.types.StructType
2829

2930

3031
/**
@@ -161,18 +162,17 @@ case class CreateViewCommand(
161162
* SQL based on the analyzed plan, and also creates the proper schema for the view.
162163
*/
163164
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
164-
val viewSQL: String = {
165-
val logicalPlan = if (userSpecifiedColumns.isEmpty) {
166-
analyzedPlan
167-
} else {
168-
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
169-
case (attr, (colName, _)) => Alias(attr, colName)()
170-
}
171-
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
165+
val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
166+
analyzedPlan
167+
} else {
168+
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
169+
case (attr, (colName, _)) => Alias(attr, colName)()
172170
}
173-
new SQLBuilder(logicalPlan).toSQL
171+
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
174172
}
175173

174+
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
175+
176176
// Validate the view SQL - make sure we can parse it and analyze it.
177177
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
178178
try {
@@ -184,14 +184,11 @@ case class CreateViewCommand(
184184
}
185185

186186
val viewSchema = if (userSpecifiedColumns.isEmpty) {
187-
analyzedPlan.output.map { a =>
188-
CatalogColumn(a.name, a.dataType.catalogString)
189-
}
187+
aliasedPlan.schema
190188
} else {
191-
analyzedPlan.output.zip(userSpecifiedColumns).map {
192-
case (a, (name, comment)) =>
193-
CatalogColumn(name, a.dataType.catalogString, comment = comment)
194-
}
189+
StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
190+
case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
191+
})
195192
}
196193

197194
CatalogTable(

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
157157
val columns = tableMetadata.schema.map { c =>
158158
new Column(
159159
name = c.name,
160-
description = c.comment.orNull,
161-
dataType = c.dataType,
160+
description = c.getComment().orNull,
161+
dataType = c.dataType.catalogString,
162162
nullable = c.nullable,
163163
isPartition = partitionColumnNames.contains(c.name),
164164
isBucket = bucketColumnNames.contains(c.name))

0 commit comments

Comments
 (0)