Skip to content

Commit f923c84

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table
## What changes were proposed in this pull request? When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data. However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc. This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs: * SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files. * SPARK-18912: We forget to check the number of columns for non-file-based data source table * SPARK-18913: We don't support append data to a table with special column names. ## How was this patch tested? new regression test. Author: Wenchen Fan <[email protected]> Closes #16313 from cloud-fan/bug1.
1 parent fa829ce commit f923c84

File tree

8 files changed

+180
-91
lines changed

8 files changed

+180
-91
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.catalog
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.hadoop.util.Shell
2222

23+
import org.apache.spark.sql.AnalysisException
24+
import org.apache.spark.sql.catalyst.analysis.Resolver
2325
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2426

2527
object ExternalCatalogUtils {
@@ -133,4 +135,39 @@ object CatalogUtils {
133135
case o => o
134136
}
135137
}
138+
139+
def normalizePartCols(
140+
tableName: String,
141+
tableCols: Seq[String],
142+
partCols: Seq[String],
143+
resolver: Resolver): Seq[String] = {
144+
partCols.map(normalizeColumnName(tableName, tableCols, _, "partition", resolver))
145+
}
146+
147+
def normalizeBucketSpec(
148+
tableName: String,
149+
tableCols: Seq[String],
150+
bucketSpec: BucketSpec,
151+
resolver: Resolver): BucketSpec = {
152+
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec
153+
val normalizedBucketCols = bucketColumnNames.map { colName =>
154+
normalizeColumnName(tableName, tableCols, colName, "bucket", resolver)
155+
}
156+
val normalizedSortCols = sortColumnNames.map { colName =>
157+
normalizeColumnName(tableName, tableCols, colName, "sort", resolver)
158+
}
159+
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
160+
}
161+
162+
private def normalizeColumnName(
163+
tableName: String,
164+
tableCols: Seq[String],
165+
colName: String,
166+
colType: String,
167+
resolver: Resolver): String = {
168+
tableCols.find(resolver(_, colName)).getOrElse {
169+
throw new AnalysisException(s"$colType column $colName is not defined in table $tableName, " +
170+
s"defined table columns are: ${tableCols.mkString(", ")}")
171+
}
172+
}
136173
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ case class BucketSpec(
133133
if (numBuckets <= 0) {
134134
throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
135135
}
136+
137+
override def toString: String = {
138+
val bucketString = s"bucket columns: [${bucketColumnNames.mkString(", ")}]"
139+
val sortString = if (sortColumnNames.nonEmpty) {
140+
s", sort columns: [${sortColumnNames.mkString(", ")}]"
141+
} else {
142+
""
143+
}
144+
s"$numBuckets buckets, $bucketString$sortString"
145+
}
136146
}
137147

138148
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql._
21-
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2221
import org.apache.spark.sql.catalyst.catalog._
23-
import org.apache.spark.sql.catalyst.plans.QueryPlan
22+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2423
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2524
import org.apache.spark.sql.execution.datasources._
26-
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
27-
import org.apache.spark.sql.types._
25+
import org.apache.spark.sql.sources.BaseRelation
2826

2927
/**
3028
* A command used to create a data source table.
@@ -143,8 +141,9 @@ case class CreateDataSourceTableAsSelectCommand(
143141
val tableName = tableIdentWithDB.unquotedString
144142

145143
var createMetastoreTable = false
146-
var existingSchema = Option.empty[StructType]
147-
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
144+
// We may need to reorder the columns of the query to match the existing table.
145+
var reorderedColumns = Option.empty[Seq[NamedExpression]]
146+
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
148147
// Check if we need to throw an exception or just return.
149148
mode match {
150149
case SaveMode.ErrorIfExists =>
@@ -157,39 +156,76 @@ case class CreateDataSourceTableAsSelectCommand(
157156
// Since the table already exists and the save mode is Ignore, we will just return.
158157
return Seq.empty[Row]
159158
case SaveMode.Append =>
159+
val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB)
160+
161+
if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
162+
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
163+
"not supported yet. Please use the insertInto() API as an alternative.")
164+
}
165+
160166
// Check if the specified data source match the data source of the existing table.
161-
val existingProvider = DataSource.lookupDataSource(provider)
167+
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
168+
val specifiedProvider = DataSource.lookupDataSource(table.provider.get)
162169
// TODO: Check that options from the resolved relation match the relation that we are
163170
// inserting into (i.e. using the same compression).
171+
if (existingProvider != specifiedProvider) {
172+
throw new AnalysisException(s"The format of the existing table $tableName is " +
173+
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
174+
s"`${specifiedProvider.getSimpleName}`.")
175+
}
164176

165-
// Pass a table identifier with database part, so that `lookupRelation` won't get temp
166-
// views unexpectedly.
167-
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
168-
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
169-
// check if the file formats match
170-
l.relation match {
171-
case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
172-
throw new AnalysisException(
173-
s"The file format of the existing table $tableName is " +
174-
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
175-
s"format `$provider`")
176-
case _ =>
177-
}
178-
if (query.schema.size != l.schema.size) {
179-
throw new AnalysisException(
180-
s"The column number of the existing schema[${l.schema}] " +
181-
s"doesn't match the data schema[${query.schema}]'s")
182-
}
183-
existingSchema = Some(l.schema)
184-
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
185-
existingSchema = Some(s.metadata.schema)
186-
case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
187-
throw new AnalysisException("Saving data in the Hive serde table " +
188-
s"${c.catalogTable.identifier} is not supported yet. Please use the " +
189-
"insertInto() API as an alternative..")
190-
case o =>
191-
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
177+
if (query.schema.length != existingTable.schema.length) {
178+
throw new AnalysisException(
179+
s"The column number of the existing table $tableName" +
180+
s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
181+
s"(${query.schema.catalogString})")
192182
}
183+
184+
val resolver = sessionState.conf.resolver
185+
val tableCols = existingTable.schema.map(_.name)
186+
187+
reorderedColumns = Some(existingTable.schema.map { f =>
188+
query.resolve(Seq(f.name), resolver).getOrElse {
189+
val inputColumns = query.schema.map(_.name).mkString(", ")
190+
throw new AnalysisException(
191+
s"cannot resolve '${f.name}' given input columns: [$inputColumns]")
192+
}
193+
})
194+
195+
// In `AnalyzeCreateTable`, we verified the consistency between the user-specified table
196+
// definition(partition columns, bucketing) and the SELECT query, here we also need to
197+
// verify the the consistency between the user-specified table definition and the existing
198+
// table definition.
199+
200+
// Check if the specified partition columns match the existing table.
201+
val specifiedPartCols = CatalogUtils.normalizePartCols(
202+
tableName, tableCols, table.partitionColumnNames, resolver)
203+
if (specifiedPartCols != existingTable.partitionColumnNames) {
204+
throw new AnalysisException(
205+
s"""
206+
|Specified partitioning does not match that of the existing table $tableName.
207+
|Specified partition columns: [${specifiedPartCols.mkString(", ")}]
208+
|Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}]
209+
""".stripMargin)
210+
}
211+
212+
// Check if the specified bucketing match the existing table.
213+
val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
214+
CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
215+
}
216+
if (specifiedBucketSpec != existingTable.bucketSpec) {
217+
val specifiedBucketString =
218+
specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
219+
val existingBucketString =
220+
existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
221+
throw new AnalysisException(
222+
s"""
223+
|Specified bucketing does not match that of the existing table $tableName.
224+
|Specified bucketing: $specifiedBucketString
225+
|Existing bucketing: $existingBucketString
226+
""".stripMargin)
227+
}
228+
193229
case SaveMode.Overwrite =>
194230
sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
195231
// Need to create the table again.
@@ -201,9 +237,9 @@ case class CreateDataSourceTableAsSelectCommand(
201237
}
202238

203239
val data = Dataset.ofRows(sparkSession, query)
204-
val df = existingSchema match {
205-
// If we are inserting into an existing table, just use the existing schema.
206-
case Some(s) => data.selectExpr(s.fieldNames: _*)
240+
val df = reorderedColumns match {
241+
// Reorder the columns of the query to match the existing table.
242+
case Some(cols) => data.select(cols.map(Column(_)): _*)
207243
case None => data
208244
}
209245

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.util.regex.Pattern
21-
2220
import scala.util.control.NonFatal
2321

2422
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
25-
import org.apache.spark.sql.catalyst.TableIdentifier
2623
import org.apache.spark.sql.catalyst.analysis._
27-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, SessionCatalog}
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogUtils, SessionCatalog}
2825
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
2926
import org.apache.spark.sql.catalyst.plans.logical
3027
import org.apache.spark.sql.catalyst.plans.logical._
@@ -122,9 +119,12 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
122119
}
123120

124121
private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
125-
val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName =>
126-
normalizeColumnName(tableDesc.identifier, schema, colName, "partition")
127-
}
122+
val normalizedPartitionCols = CatalogUtils.normalizePartCols(
123+
tableName = tableDesc.identifier.unquotedString,
124+
tableCols = schema.map(_.name),
125+
partCols = tableDesc.partitionColumnNames,
126+
resolver = sparkSession.sessionState.conf.resolver)
127+
128128
checkDuplication(normalizedPartitionCols, "partition")
129129

130130
if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
@@ -149,25 +149,21 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
149149

150150
private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
151151
tableDesc.bucketSpec match {
152-
case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
153-
val normalizedBucketCols = bucketColumnNames.map { colName =>
154-
normalizeColumnName(tableDesc.identifier, schema, colName, "bucket")
155-
}
156-
checkDuplication(normalizedBucketCols, "bucket")
157-
158-
val normalizedSortCols = sortColumnNames.map { colName =>
159-
normalizeColumnName(tableDesc.identifier, schema, colName, "sort")
160-
}
161-
checkDuplication(normalizedSortCols, "sort")
162-
163-
schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
152+
case Some(bucketSpec) =>
153+
val normalizedBucketing = CatalogUtils.normalizeBucketSpec(
154+
tableName = tableDesc.identifier.unquotedString,
155+
tableCols = schema.map(_.name),
156+
bucketSpec = bucketSpec,
157+
resolver = sparkSession.sessionState.conf.resolver)
158+
checkDuplication(normalizedBucketing.bucketColumnNames, "bucket")
159+
checkDuplication(normalizedBucketing.sortColumnNames, "sort")
160+
161+
normalizedBucketing.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
164162
case dt if RowOrdering.isOrderable(dt) => // OK
165163
case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column")
166164
}
167165

168-
tableDesc.copy(
169-
bucketSpec = Some(BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols))
170-
)
166+
tableDesc.copy(bucketSpec = Some(normalizedBucketing))
171167

172168
case None => tableDesc
173169
}
@@ -182,19 +178,6 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
182178
}
183179
}
184180

185-
private def normalizeColumnName(
186-
tableIdent: TableIdentifier,
187-
schema: StructType,
188-
colName: String,
189-
colType: String): String = {
190-
val tableCols = schema.map(_.name)
191-
val resolver = sparkSession.sessionState.conf.resolver
192-
tableCols.find(resolver(_, colName)).getOrElse {
193-
failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
194-
s"defined table columns are: ${tableCols.mkString(", ")}")
195-
}
196-
}
197-
198181
private def failAnalysis(msg: String) = throw new AnalysisException(msg)
199182
}
200183

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,15 +345,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
345345
val e = intercept[AnalysisException] {
346346
sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
347347
}
348-
assert(e.message == "partition column c is not defined in table `tbl`, " +
348+
assert(e.message == "partition column c is not defined in table tbl, " +
349349
"defined table columns are: a, b")
350350
}
351351

352352
test("create table - bucket column names not in table definition") {
353353
val e = intercept[AnalysisException] {
354354
sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
355355
}
356-
assert(e.message == "bucket column c is not defined in table `tbl`, " +
356+
assert(e.message == "bucket column c is not defined in table tbl, " +
357357
"defined table columns are: a, b")
358358
}
359359

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,14 @@ class DefaultSourceWithoutUserSpecifiedSchema
108108
}
109109

110110
class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
111-
111+
import testImplicits._
112112

113113
private val userSchema = new StructType().add("s", StringType)
114114
private val textSchema = new StructType().add("value", StringType)
115115
private val data = Seq("1", "2", "3")
116116
private val dir = Utils.createTempDir(namePrefix = "input").getCanonicalPath
117-
private implicit var enc: Encoder[String] = _
118117

119118
before {
120-
enc = spark.implicits.newStringEncoder
121119
Utils.deleteRecursively(new File(dir))
122120
}
123121

@@ -459,8 +457,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
459457
}
460458

461459
test("column nullability and comment - write and then read") {
462-
import testImplicits._
463-
464460
Seq("json", "parquet", "csv").foreach { format =>
465461
val schema = StructType(
466462
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
@@ -576,7 +572,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
576572

577573
test("SPARK-18510: use user specified types for partition columns in file sources") {
578574
import org.apache.spark.sql.functions.udf
579-
import testImplicits._
580575
withTempDir { src =>
581576
val createArray = udf { (length: Long) =>
582577
for (i <- 1 to length.toInt) yield i.toString
@@ -609,4 +604,35 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
609604
)
610605
}
611606
}
607+
608+
test("SPARK-18899: append to a bucketed table using DataFrameWriter with mismatched bucketing") {
609+
withTable("t") {
610+
Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.bucketBy(2, "i").saveAsTable("t")
611+
val e = intercept[AnalysisException] {
612+
Seq(3 -> "c").toDF("i", "j").write.bucketBy(3, "i").mode("append").saveAsTable("t")
613+
}
614+
assert(e.message.contains("Specified bucketing does not match that of the existing table"))
615+
}
616+
}
617+
618+
test("SPARK-18912: number of columns mismatch for non-file-based data source table") {
619+
withTable("t") {
620+
sql("CREATE TABLE t USING org.apache.spark.sql.test.DefaultSource")
621+
622+
val e = intercept[AnalysisException] {
623+
Seq(1 -> "a").toDF("a", "b").write
624+
.format("org.apache.spark.sql.test.DefaultSource")
625+
.mode("append").saveAsTable("t")
626+
}
627+
assert(e.message.contains("The column number of the existing table"))
628+
}
629+
}
630+
631+
test("SPARK-18913: append to a table with special column names") {
632+
withTable("t") {
633+
Seq(1 -> "a").toDF("x.x", "y.y").write.saveAsTable("t")
634+
Seq(2 -> "b").toDF("x.x", "y.y").write.mode("append").saveAsTable("t")
635+
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
636+
}
637+
}
612638
}

0 commit comments

Comments
 (0)