Skip to content

Commit 8ae8f18

Browse files
committed
fix
1 parent ea0a5ee commit 8ae8f18

File tree

6 files changed

+288
-83
lines changed

6 files changed

+288
-83
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,22 @@ statement
7373
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
7474
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
7575
| createTableHeader ('(' colTypeList ')')? tableProvider
76-
(OPTIONS options=tablePropertyList)?
77-
(PARTITIONED BY partitionColumnNames=identifierList)?
78-
bucketSpec? locationSpec?
79-
(COMMENT comment=STRING)?
80-
(TBLPROPERTIES tableProps=tablePropertyList)?
76+
((OPTIONS options=tablePropertyList) |
77+
(PARTITIONED BY partitionColumnNames=identifierList) |
78+
bucketSpec |
79+
locationSpec |
80+
(COMMENT comment=STRING) |
81+
(TBLPROPERTIES tableProps=tablePropertyList))*
8182
(AS? query)? #createTable
8283
| createTableHeader ('(' columns=colTypeList ')')?
83-
(COMMENT comment=STRING)?
84-
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
85-
bucketSpec? skewSpec?
86-
rowFormat? createFileFormat? locationSpec?
87-
(TBLPROPERTIES tablePropertyList)?
84+
((COMMENT comment=STRING) |
85+
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
86+
bucketSpec |
87+
skewSpec |
88+
rowFormat |
89+
createFileFormat |
90+
locationSpec |
91+
(TBLPROPERTIES tableProps=tablePropertyList))*
8892
(AS? query)? #createHiveTable
8993
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
9094
LIKE source=tableIdentifier locationSpec? #createTableLike

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.sql.catalyst.parser
1818

19+
import java.util
20+
1921
import scala.collection.mutable.StringBuilder
2022

2123
import org.antlr.v4.runtime.{ParserRuleContext, Token}
@@ -39,6 +41,17 @@ object ParserUtils {
3941
throw new ParseException(s"Operation not allowed: $message", ctx)
4042
}
4143

44+
def duplicateClausesNotAllowed(message: String, ctx: ParserRuleContext): Nothing = {
45+
throw new ParseException(s"Found duplicate clauses: $message", ctx)
46+
}
47+
48+
def checkDuplicateClauses(
49+
nodes: util.List[TerminalNode], clauseName: String, ctx: ParserRuleContext): Unit = {
50+
if (nodes.size() > 1) {
51+
throw new ParseException(s"Found duplicate clauses: $clauseName", ctx)
52+
}
53+
}
54+
4255
/** Check if duplicate keys exist in a set of key-value pairs. */
4356
def checkDuplicateKeys[T](keyPairs: Seq[(String, T)], ctx: ParserRuleContext): Unit = {
4457
keyPairs.groupBy(_._1).filter(_._2.size > 1).foreach { case (key, _) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -384,22 +384,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
384384
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
385385
* USING table_provider
386386
* [OPTIONS table_property_list]
387-
* [PARTITIONED BY (col_name, col_name, ...)]
388-
* [CLUSTERED BY (col_name, col_name, ...)
389-
* [SORTED BY (col_name [ASC|DESC], ...)]
390-
* INTO num_buckets BUCKETS
391-
* ]
392-
* [LOCATION path]
393-
* [COMMENT table_comment]
394-
* [TBLPROPERTIES (property_name=property_value, ...)]
387+
* create_table_clauses
395388
* [[AS] select_statement];
389+
*
390+
* create_table_clauses (order insensitive):
391+
* [PARTITIONED BY (col_name, col_name, ...)]
392+
* [CLUSTERED BY (col_name, col_name, ...)
393+
* [SORTED BY (col_name [ASC|DESC], ...)]
394+
* INTO num_buckets BUCKETS
395+
* ]
396+
* [LOCATION path]
397+
* [COMMENT table_comment]
398+
* [TBLPROPERTIES (property_name=property_value, ...)]
396399
* }}}
397400
*/
398401
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
399402
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
400403
if (external) {
401404
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
402405
}
406+
407+
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
408+
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
409+
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
410+
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
411+
403412
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
404413
val provider = ctx.tableProvider.qualifiedName.getText
405414
val schema = Option(ctx.colTypeList()).map(createSchema)
@@ -408,9 +417,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
408417
.map(visitIdentifierList(_).toArray)
409418
.getOrElse(Array.empty[String])
410419
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
411-
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
420+
val bucketSpec = if (ctx.bucketSpec().size > 1) {
421+
duplicateClausesNotAllowed("CLUSTERED BY", ctx)
422+
} else {
423+
ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
424+
}
412425

413-
val location = Option(ctx.locationSpec).map(visitLocationSpec)
426+
val location = if (ctx.locationSpec.size > 1) {
427+
duplicateClausesNotAllowed("LOCATION", ctx)
428+
} else {
429+
ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
430+
}
414431
val storage = DataSource.buildStorageFormatFromOptions(options)
415432

416433
if (location.isDefined && storage.locationUri.isDefined) {
@@ -1087,13 +1104,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
10871104
* {{{
10881105
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
10891106
* [(col1[:] data_type [COMMENT col_comment], ...)]
1090-
* [COMMENT table_comment]
1091-
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
1092-
* [ROW FORMAT row_format]
1093-
* [STORED AS file_format]
1094-
* [LOCATION path]
1095-
* [TBLPROPERTIES (property_name=property_value, ...)]
1107+
* create_table_clauses
10961108
* [AS select_statement];
1109+
*
1110+
* create_table_clauses (order insensitive):
1111+
* [COMMENT table_comment]
1112+
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
1113+
* [ROW FORMAT row_format]
1114+
* [STORED AS file_format]
1115+
* [LOCATION path]
1116+
* [TBLPROPERTIES (property_name=property_value, ...)]
10971117
* }}}
10981118
*/
10991119
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
@@ -1104,28 +1124,48 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
11041124
"CREATE TEMPORARY TABLE is not supported yet. " +
11051125
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
11061126
}
1107-
if (ctx.skewSpec != null) {
1127+
if (ctx.skewSpec.size > 0) {
11081128
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
11091129
}
11101130

1131+
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
1132+
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
1133+
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
1134+
11111135
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
11121136
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
1113-
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
1137+
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
11141138
val selectQuery = Option(ctx.query).map(plan)
1115-
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
1139+
val bucketSpec = if (ctx.bucketSpec().size > 1) {
1140+
duplicateClausesNotAllowed("CLUSTERED BY", ctx)
1141+
} else {
1142+
ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
1143+
}
11161144

11171145
// Note: Hive requires partition columns to be distinct from the schema, so we need
11181146
// to include the partition columns here explicitly
11191147
val schema = StructType(dataCols ++ partitionCols)
11201148

11211149
// Storage format
11221150
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
1123-
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
1124-
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
1125-
.getOrElse(CatalogStorageFormat.empty)
1126-
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
1127-
.getOrElse(CatalogStorageFormat.empty)
1128-
val location = Option(ctx.locationSpec).map(visitLocationSpec)
1151+
validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
1152+
val fileStorage = if (ctx.createFileFormat.size > 1) {
1153+
duplicateClausesNotAllowed("STORED AS/BY", ctx)
1154+
} else {
1155+
ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat)
1156+
.getOrElse(CatalogStorageFormat.empty)
1157+
}
1158+
val rowStorage = if (ctx.rowFormat.size > 1) {
1159+
duplicateClausesNotAllowed("ROW FORMAT", ctx)
1160+
} else {
1161+
ctx.rowFormat.asScala.headOption.map(visitRowFormat)
1162+
.getOrElse(CatalogStorageFormat.empty)
1163+
}
1164+
val location = if (ctx.locationSpec.size > 1) {
1165+
duplicateClausesNotAllowed("LOCATION", ctx)
1166+
} else {
1167+
ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
1168+
}
11291169
// If we are creating an EXTERNAL table, then the LOCATION field is required
11301170
if (external && location.isEmpty) {
11311171
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
@@ -1366,6 +1406,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
13661406
}
13671407
}
13681408

1409+
private def validateRowFormatFileFormat(
1410+
rowFormatCtx: Seq[RowFormatContext],
1411+
createFileFormatCtx: Seq[CreateFileFormatContext],
1412+
parentCtx: ParserRuleContext): Unit = {
1413+
if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
1414+
validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx)
1415+
}
1416+
}
1417+
13691418
/**
13701419
* Create or replace a view. This creates a [[CreateViewCommand]] command.
13711420
*

0 commit comments

Comments
 (0)