Skip to content

Commit 719ecf2

Browse files
committed
fix.
1 parent dccc0aa commit 719ecf2

File tree

2 files changed

+52
-5
lines changed

2 files changed

+52
-5
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
729729
properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
730730
}
731731

732+
// Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
733+
// columns are not put at the end of schema. We need to reorder it when reading the schema
734+
// from the table properties.
735+
private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
736+
val partitionFields = partColumnNames.map { partCol =>
737+
schema.find(_.name == partCol).getOrElse {
738+
throw new AnalysisException("The metadata is corrupted. Unable to find the " +
739+
s"partition column names from the schema. schema: ${schema.catalogString}. " +
740+
s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
741+
}
742+
}
743+
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
744+
}
745+
732746
private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
733747
val hiveTable = table.copy(
734748
provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
738752
// schema from table properties.
739753
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
740754
val schemaFromTableProps = getSchemaFromTableProperties(table)
741-
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
755+
val partColumnNames = getPartitionColumnsFromTableProperties(table)
756+
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
757+
758+
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
742759
hiveTable.copy(
743-
schema = schemaFromTableProps,
744-
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
760+
schema = reorderedSchema,
761+
partitionColumnNames = partColumnNames,
745762
bucketSpec = getBucketSpecFromTableProperties(table))
746763
} else {
747764
// Hive metastore may change the table schema, e.g. schema inference. If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
771788
}
772789
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
773790

791+
val schemaFromTableProps = getSchemaFromTableProperties(table)
792+
val partColumnNames = getPartitionColumnsFromTableProperties(table)
793+
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
794+
774795
table.copy(
775796
provider = Some(provider),
776797
storage = storageWithLocation,
777-
schema = getSchemaFromTableProperties(table),
778-
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
798+
schema = reorderedSchema,
799+
partitionColumnNames = partColumnNames,
779800
bucketSpec = getBucketSpecFromTableProperties(table),
780801
tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
781802
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
6363
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
6464
assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
6565
}
66+
67+
Seq("parquet", "hive").foreach { format =>
68+
test(s"Partition columns should be put at the end of table schema for the format $format") {
69+
val catalog = newBasicCatalog()
70+
val newSchema = new StructType()
71+
.add("col1", "int")
72+
.add("col2", "string")
73+
.add("partCol1", "int")
74+
.add("partCol2", "string")
75+
val table = CatalogTable(
76+
identifier = TableIdentifier("tbl", Some("db1")),
77+
tableType = CatalogTableType.MANAGED,
78+
storage = CatalogStorageFormat.empty,
79+
schema = new StructType()
80+
.add("col1", "int")
81+
.add("partCol1", "int")
82+
.add("partCol2", "string")
83+
.add("col2", "string"),
84+
provider = Some(format),
85+
partitionColumnNames = Seq("partCol1", "partCol2"))
86+
catalog.createTable(table, ignoreIfExists = false)
87+
88+
val restoredTable = externalCatalog.getTable("db1", "tbl")
89+
assert(restoredTable.schema == newSchema)
90+
}
91+
}
6692
}

0 commit comments

Comments
 (0)