@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
729729 properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX ) })
730730 }
731731
732+ // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
733+ // columns are not put at the end of schema. We need to reorder it when reading the schema
734+ // from the table properties.
735+ private def reorderSchema (schema : StructType , partColumnNames : Seq [String ]): StructType = {
736+ val partitionFields = partColumnNames.map { partCol =>
737+ schema.find(_.name == partCol).getOrElse {
738+ throw new AnalysisException (" The metadata is corrupted. Unable to find the " +
739+ s " partition column names from the schema. schema: ${schema.catalogString}. " +
740+ s " Partition columns: ${partColumnNames.mkString(" [" , " , " , " ]" )}" )
741+ }
742+ }
743+ StructType (schema.filterNot(partitionFields.contains) ++ partitionFields)
744+ }
745+
732746 private def restoreHiveSerdeTable (table : CatalogTable ): CatalogTable = {
733747 val hiveTable = table.copy(
734748 provider = Some (DDLUtils .HIVE_PROVIDER ),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
738752 // schema from table properties.
739753 if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS )) {
740754 val schemaFromTableProps = getSchemaFromTableProperties(table)
741- if (DataType .equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
755+ val partColumnNames = getPartitionColumnsFromTableProperties(table)
756+ val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
757+
758+ if (DataType .equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
742759 hiveTable.copy(
743- schema = schemaFromTableProps ,
744- partitionColumnNames = getPartitionColumnsFromTableProperties(table) ,
760+ schema = reorderedSchema ,
761+ partitionColumnNames = partColumnNames ,
745762 bucketSpec = getBucketSpecFromTableProperties(table))
746763 } else {
747764 // Hive metastore may change the table schema, e.g. schema inference. If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
771788 }
772789 val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER )
773790
791+ val schemaFromTableProps = getSchemaFromTableProperties(table)
792+ val partColumnNames = getPartitionColumnsFromTableProperties(table)
793+ val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
794+
774795 table.copy(
775796 provider = Some (provider),
776797 storage = storageWithLocation,
777- schema = getSchemaFromTableProperties(table) ,
778- partitionColumnNames = getPartitionColumnsFromTableProperties(table) ,
798+ schema = reorderedSchema ,
799+ partitionColumnNames = partColumnNames ,
779800 bucketSpec = getBucketSpecFromTableProperties(table),
780801 tracksPartitionsInCatalog = partitionProvider == Some (TABLE_PARTITION_PROVIDER_CATALOG ))
781802 }
0 commit comments