Skip to content

Commit f26c08c

Browse files
committed
address comments
1 parent 31f2efa commit f26c08c

File tree

4 files changed

+69
-116
lines changed

4 files changed

+69
-116
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,6 @@ class SessionCatalog(
291291
val table = formatTableName(tableDefinition.identifier.table)
292292
val tableIdentifier = TableIdentifier(table, Some(db))
293293
validateName(table)
294-
if (!ignoreIfExists) {
295-
validateTableLocation(tableDefinition, tableIdentifier)
296-
}
297294

298295
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
299296
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
@@ -308,17 +305,21 @@ class SessionCatalog(
308305
}
309306

310307
requireDbExists(db)
308+
if (!ignoreIfExists) {
309+
validateTableLocation(newTableDefinition)
310+
}
311311
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
312312
}
313313

314-
def validateTableLocation(table: CatalogTable, tableIdentifier: TableIdentifier): Unit = {
314+
def validateTableLocation(table: CatalogTable): Unit = {
315315
// SPARK-19724: the default location of a managed table should be non-existent or empty.
316316
if (table.tableType == CatalogTableType.MANAGED) {
317-
val tableLocation = new Path(defaultTablePath(tableIdentifier))
317+
val tableLocation =
318+
new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier)))
318319
val fs = tableLocation.getFileSystem(hadoopConf)
319320

320321
if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
321-
throw new AnalysisException(s"Can not create the managed table('${tableIdentifier}')" +
322+
throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" +
322323
s". The associated location('${tableLocation.toString}') already exists.")
323324
}
324325
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand(
167167
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
168168
} else {
169169
assert(table.schema.isEmpty)
170-
sparkSession.sessionState.catalog.validateTableLocation(table, table.identifier)
170+
sparkSession.sessionState.catalog.validateTableLocation(table)
171171
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
172172
Some(sessionState.catalog.defaultTablePath(table.identifier))
173173
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 61 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -103,60 +103,6 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
103103
}
104104
}
105105

106-
test("CTAS a managed table with the existing empty directory") {
107-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
108-
try {
109-
tableLoc.mkdir()
110-
withTable("tab1") {
111-
sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'")
112-
checkAnswer(spark.table("tab1"), Row(1, "a"))
113-
}
114-
} finally {
115-
waitForTasksToFinish()
116-
Utils.deleteRecursively(tableLoc)
117-
}
118-
}
119-
120-
test("create a managed table with the existing empty directory") {
121-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
122-
try {
123-
tableLoc.mkdir()
124-
withTable("tab1") {
125-
sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET")
126-
sql("INSERT INTO tab1 VALUES (1, 'a')")
127-
checkAnswer(spark.table("tab1"), Row(1, "a"))
128-
}
129-
} finally {
130-
waitForTasksToFinish()
131-
Utils.deleteRecursively(tableLoc)
132-
}
133-
}
134-
135-
test("create a managed table with the existing non-empty directory") {
136-
withTable("tab1") {
137-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
138-
try {
139-
// create an empty hidden file
140-
tableLoc.mkdir()
141-
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
142-
hiddenGarbageFile.createNewFile()
143-
var ex = intercept[AnalysisException] {
144-
sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'")
145-
}.getMessage
146-
assert(ex.contains("Can not create the managed table('`tab1`'). The associated location"))
147-
148-
ex = intercept[AnalysisException] {
149-
sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET")
150-
}.getMessage
151-
assert(ex.contains(
152-
"Can not create the managed table('`default`.`tab1`'). The associated location"))
153-
} finally {
154-
waitForTasksToFinish()
155-
Utils.deleteRecursively(tableLoc)
156-
}
157-
}
158-
}
159-
160106
test("Create Hive Table As Select") {
161107
import testImplicits._
162108
withTable("t", "t1") {
@@ -234,6 +180,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
234180

235181
private val escapedIdentifier = "`(.+)`".r
236182

183+
private def dataSource: String = {
184+
if (isUsingHiveMetastore) {
185+
"HIVE"
186+
} else {
187+
"PARQUET"
188+
}
189+
}
237190
protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
238191

239192
private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
@@ -419,6 +372,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
419372
}
420373
}
421374

375+
test("CTAS a managed table with the existing empty directory") {
376+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
377+
try {
378+
tableLoc.mkdir()
379+
withTable("tab1") {
380+
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
381+
checkAnswer(spark.table("tab1"), Row(1, "a"))
382+
}
383+
} finally {
384+
waitForTasksToFinish()
385+
Utils.deleteRecursively(tableLoc)
386+
}
387+
}
388+
389+
test("create a managed table with the existing empty directory") {
390+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
391+
try {
392+
tableLoc.mkdir()
393+
withTable("tab1") {
394+
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
395+
sql("INSERT INTO tab1 VALUES (1, 'a')")
396+
checkAnswer(spark.table("tab1"), Row(1, "a"))
397+
}
398+
} finally {
399+
waitForTasksToFinish()
400+
Utils.deleteRecursively(tableLoc)
401+
}
402+
}
403+
404+
test("create a managed table with the existing non-empty directory") {
405+
withTable("tab1") {
406+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
407+
try {
408+
// create an empty hidden file
409+
tableLoc.mkdir()
410+
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
411+
hiddenGarbageFile.createNewFile()
412+
var ex = intercept[AnalysisException] {
413+
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
414+
}.getMessage
415+
assert(ex.contains("Can not create the managed table('`tab1`'). The associated location"))
416+
417+
ex = intercept[AnalysisException] {
418+
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
419+
}.getMessage
420+
assert(ex.contains(
421+
"Can not create the managed table('`default`.`tab1`'). The associated location"))
422+
} finally {
423+
waitForTasksToFinish()
424+
Utils.deleteRecursively(tableLoc)
425+
}
426+
}
427+
}
428+
422429
private def checkSchemaInCreatedDataSourceTable(
423430
path: File,
424431
userSpecifiedSchema: Option[String],

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -437,61 +437,6 @@ class HiveDDLSuite
437437
}
438438
}
439439

440-
test("CTAS a managed table with the existing empty directory") {
441-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
442-
try {
443-
tableLoc.mkdir()
444-
withTable("tab1") {
445-
sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'")
446-
checkAnswer(spark.table("tab1"), Row(1, "a"))
447-
}
448-
} finally {
449-
waitForTasksToFinish()
450-
Utils.deleteRecursively(tableLoc)
451-
}
452-
}
453-
454-
test("create a managed table with the existing empty directory") {
455-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
456-
try {
457-
tableLoc.mkdir()
458-
withTable("tab1") {
459-
sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE")
460-
sql("INSERT INTO tab1 VALUES (1, 'a')")
461-
checkAnswer(spark.table("tab1"), Row(1, "a"))
462-
}
463-
} finally {
464-
waitForTasksToFinish()
465-
Utils.deleteRecursively(tableLoc)
466-
}
467-
}
468-
469-
test("create a managed table with the existing non-empty directory") {
470-
withTable("tab1") {
471-
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
472-
try {
473-
// create an empty hidden file
474-
tableLoc.mkdir()
475-
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
476-
hiddenGarbageFile.createNewFile()
477-
var ex = intercept[AnalysisException] {
478-
sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'")
479-
}.getMessage
480-
assert(ex.contains(
481-
"Can not create the managed table('`default`.`tab1`'). The associated location"))
482-
483-
ex = intercept[AnalysisException] {
484-
sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE")
485-
}.getMessage
486-
assert(ex.contains(
487-
"Can not create the managed table('`default`.`tab1`'). The associated location"))
488-
} finally {
489-
waitForTasksToFinish()
490-
Utils.deleteRecursively(tableLoc)
491-
}
492-
}
493-
}
494-
495440
test("create table: partition column names exist in table definition") {
496441
val e = intercept[AnalysisException] {
497442
sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")

0 commit comments

Comments
 (0)