Skip to content

Commit a7eef21

Browse files
committed
[SPARK-43438][SQL] Error on missing input columns in INSERT
### What changes were proposed in this pull request? In the PR, I propose to raise an error when an user uses V1 `INSERT` without a list of columns, and the number of inserting columns doesn't match to the number of actual table columns. At the moment Spark inserts data successfully in such case after the PR #41262 which changed the behaviour of Spark 3.4.x. ### Why are the changes needed? 1. To conform the SQL standard which requires the number of columns must be the same: ![Screenshot 2023-08-07 at 11 01 27 AM](https://github.com/apache/spark/assets/1580697/c55badec-5716-490f-a83a-0bb6b22c84c7) Apparently, the insertion below must not succeed: ```sql spark-sql (default)> CREATE TABLE tabtest(c1 INT, c2 INT); spark-sql (default)> INSERT INTO tabtest SELECT 1; ``` 2. To have the same behaviour as **Spark 3.4**: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; `spark_catalog`.`default`.`tabtest` requires that the data to be inserted have the same number of columns as the target table: target table has 2 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s). ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes: ```sql spark-sql (default)> INSERT INTO tabtest SELECT 1; [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`tabtest`, the reason is not enough data columns: Table columns: `c1`, `c2`. Data columns: `1`. ``` ### How was this patch tested? By running the modified tests: ``` $ build/sbt "test:testOnly *InsertSuite" $ build/sbt "test:testOnly *ResolveDefaultColumnsSuite" $ build/sbt -Phive "test:testOnly *HiveQuerySuite" ``` Closes #42393 from MaxGekk/fix-num-cols-insert. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent 8505084 commit a7eef21

File tree

6 files changed

+69
-37
lines changed

6 files changed

+69
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,11 @@ object TableOutputResolver {
103103
errors += _,
104104
fillDefaultValue = supportColDefaultValue)
105105
} else {
106-
// If the target table needs more columns than the input query, fill them with
107-
// the columns' default values, if the `supportColDefaultValue` parameter is true.
108-
val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size
109-
val queryOutputCols = if (fillDefaultValue) {
110-
query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol =>
111-
getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
112-
}
113-
} else {
114-
query.output
115-
}
116-
if (actualExpectedCols.size > queryOutputCols.size) {
106+
if (actualExpectedCols.size > query.output.size) {
117107
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
118108
tableName, actualExpectedCols.map(_.name), query)
119109
}
120-
121-
resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _)
110+
resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _)
122111
}
123112

124113
if (errors.nonEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,11 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
404404
}
405405
val newQuery = try {
406406
TableOutputResolver.resolveOutputColumns(
407-
tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf,
407+
tblName,
408+
expectedColumns,
409+
query,
410+
byName = hasColumnList || insert.byName,
411+
conf,
408412
supportColDefaultValue = true)
409413
} catch {
410414
case e: AnalysisException if staticPartCols.nonEmpty &&

sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
3535

3636
// INSERT without user-defined columns
3737
sql("truncate table t")
38-
sql("insert into t values (timestamp'2020-12-31')")
39-
checkAnswer(spark.table("t"),
40-
sql("select timestamp'2020-12-31', null").collect().head)
38+
checkError(
39+
exception = intercept[AnalysisException] {
40+
sql("insert into t values (timestamp'2020-12-31')")
41+
},
42+
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
43+
parameters = Map(
44+
"tableName" -> "`spark_catalog`.`default`.`t`",
45+
"tableColumns" -> "`c1`, `c2`",
46+
"dataColumns" -> "`col1`"))
4147
}
4248
}
4349

@@ -57,18 +63,31 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
5763

5864
// INSERT without user-defined columns
5965
sql("truncate table t")
60-
sql("insert into t values (timestamp'2020-12-31')")
61-
checkAnswer(spark.table("t"),
62-
sql("select timestamp'2020-12-31', timestamp'2020-01-01'").collect().head)
66+
checkError(
67+
exception = intercept[AnalysisException] {
68+
sql("insert into t values (timestamp'2020-12-31')")
69+
},
70+
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
71+
parameters = Map(
72+
"tableName" -> "`spark_catalog`.`default`.`t`",
73+
"tableColumns" -> "`c1`, `c2`",
74+
"dataColumns" -> "`col1`"))
6375
}
6476
}
6577

6678
test("INSERT into partitioned tables") {
6779
sql("create table t(c1 int, c2 int, c3 int, c4 int) using parquet partitioned by (c3, c4)")
6880

6981
// INSERT without static partitions
70-
sql("insert into t values (1, 2, 3)")
71-
checkAnswer(spark.table("t"), Row(1, 2, 3, null))
82+
checkError(
83+
exception = intercept[AnalysisException] {
84+
sql("insert into t values (1, 2, 3)")
85+
},
86+
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
87+
parameters = Map(
88+
"tableName" -> "`spark_catalog`.`default`.`t`",
89+
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
90+
"dataColumns" -> "`col1`, `col2`, `col3`"))
7291

7392
// INSERT without static partitions but with column list
7493
sql("truncate table t")
@@ -77,8 +96,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
7796

7897
// INSERT with static partitions
7998
sql("truncate table t")
80-
sql("insert into t partition(c3=3, c4=4) values (1)")
81-
checkAnswer(spark.table("t"), Row(1, null, 3, 4))
99+
checkError(
100+
exception = intercept[AnalysisException] {
101+
sql("insert into t partition(c3=3, c4=4) values (1)")
102+
},
103+
errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
104+
parameters = Map(
105+
"tableName" -> "`spark_catalog`.`default`.`t`",
106+
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
107+
"dataColumns" -> "`col1`",
108+
"staticPartCols" -> "`c3`, `c4`"))
82109

83110
// INSERT with static partitions and with column list
84111
sql("truncate table t")
@@ -87,8 +114,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
87114

88115
// INSERT with partial static partitions
89116
sql("truncate table t")
90-
sql("insert into t partition(c3=3, c4) values (1, 2)")
91-
checkAnswer(spark.table("t"), Row(1, 2, 3, null))
117+
checkError(
118+
exception = intercept[AnalysisException] {
119+
sql("insert into t partition(c3=3, c4) values (1, 2)")
120+
},
121+
errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
122+
parameters = Map(
123+
"tableName" -> "`spark_catalog`.`default`.`t`",
124+
"tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
125+
"dataColumns" -> "`col1`, `col2`",
126+
"staticPartCols" -> "`c3`"))
92127

93128
// INSERT with partial static partitions and with column list is not allowed
94129
intercept[AnalysisException](sql("insert into t partition(c3=3, c4) (c1) values (1, 4)"))

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -962,11 +962,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
962962
(1 to 10).map(i => Row(i, null))
963963
)
964964

965-
sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
966-
checkAnswer(
967-
sql("SELECT a, b FROM jsonTable"),
968-
(1 to 10).map(i => Row(i, null))
969-
)
965+
checkError(
966+
exception = intercept[AnalysisException] {
967+
sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
968+
},
969+
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
970+
parameters = Map(
971+
"tableName" -> "`unknown`",
972+
"tableColumns" -> "`a`, `b`",
973+
"dataColumns" -> "`a`"))
970974

971975
sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
972976
checkAnswer(
@@ -1027,7 +1031,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
10271031
}
10281032
withTable("t") {
10291033
sql("create table t(i int, s bigint default 42, x bigint) using parquet")
1030-
sql("insert into t values(1)")
1034+
sql("insert into t(i) values(1)")
10311035
checkAnswer(spark.table("t"), Row(1, 42L, null))
10321036
}
10331037
// The table has a partitioning column and a default value is injected.
@@ -1495,7 +1499,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
14951499
sql(createTableIntCol)
14961500
sql("alter table t add column s bigint default 42")
14971501
sql("alter table t add column x bigint")
1498-
sql("insert into t values(1)")
1502+
sql("insert into t(i) values(1)")
14991503
checkAnswer(spark.table("t"), Row(1, 42, null))
15001504
}
15011505
// The table has a partitioning column and a default value is injected.

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
391391
sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
392392

393393
// The data is missing a column. The default value for the missing column is null.
394-
sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
394+
sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) (a) SELECT 13")
395395

396396
// c is defined twice. Analyzer will complain.
397397
intercept[ParseException] {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,11 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
12581258
"""INSERT INTO TABLE dp_test PARTITION(dp)
12591259
|SELECT key, value, key % 5 FROM src""".stripMargin)
12601260
},
1261-
errorClass = "_LEGACY_ERROR_TEMP_1169",
1261+
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
12621262
parameters = Map(
12631263
"tableName" -> "`spark_catalog`.`default`.`dp_test`",
1264-
"normalizedPartSpec" -> "dp",
1265-
"partColNames" -> "dp,sp"))
1264+
"tableColumns" -> "`key`, `value`, `dp`, `sp`",
1265+
"dataColumns" -> "`key`, `value`, `(key % 5)`"))
12661266

12671267
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
12681268

0 commit comments

Comments
 (0)