Skip to content

Commit a0ba419

Browse files
committed
[SPARK-19724][SQL]create a managed table with an existed default table path should throw an exception
1 parent 0a4d06a commit a0ba419

File tree

9 files changed

+165
-47
lines changed

9 files changed

+165
-47
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -187,38 +187,32 @@ class InMemoryCatalog(
187187
val db = tableDefinition.identifier.database.get
188188
requireDbExists(db)
189189
val table = tableDefinition.identifier.table
190-
if (tableExists(db, table)) {
191-
if (!ignoreIfExists) {
192-
throw new TableAlreadyExistsException(db = db, table = table)
190+
// Set the default table location if this is a managed table and its location is not
191+
// specified.
192+
// Ideally we should not create a managed table with location, but Hive serde table can
193+
// specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
194+
// to create the table directory and write out data before we create this table, to avoid
195+
// exposing a partial written table.
196+
val needDefaultTableLocation =
197+
tableDefinition.tableType == CatalogTableType.MANAGED &&
198+
tableDefinition.storage.locationUri.isEmpty
199+
200+
val tableWithLocation = if (needDefaultTableLocation) {
201+
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
202+
try {
203+
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
204+
fs.mkdirs(defaultTableLocation)
205+
} catch {
206+
case e: IOException =>
207+
throw new SparkException(s"Unable to create table $table as failed " +
208+
s"to create its directory $defaultTableLocation", e)
193209
}
210+
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
194211
} else {
195-
// Set the default table location if this is a managed table and its location is not
196-
// specified.
197-
// Ideally we should not create a managed table with location, but Hive serde table can
198-
// specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
199-
// to create the table directory and write out data before we create this table, to avoid
200-
// exposing a partial written table.
201-
val needDefaultTableLocation =
202-
tableDefinition.tableType == CatalogTableType.MANAGED &&
203-
tableDefinition.storage.locationUri.isEmpty
204-
205-
val tableWithLocation = if (needDefaultTableLocation) {
206-
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
207-
try {
208-
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
209-
fs.mkdirs(defaultTableLocation)
210-
} catch {
211-
case e: IOException =>
212-
throw new SparkException(s"Unable to create table $table as failed " +
213-
s"to create its directory $defaultTableLocation", e)
214-
}
215-
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
216-
} else {
217-
tableDefinition
218-
}
219-
220-
catalog(db).tables.put(table, new TableDesc(tableWithLocation))
212+
tableDefinition
221213
}
214+
215+
catalog(db).tables.put(table, new TableDesc(tableWithLocation))
222216
}
223217

224218
override def dropTable(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,28 @@ class SessionCatalog(
238238
new Path(new Path(conf.warehousePath), database + ".db").toUri
239239
}
240240

241+
/**
242+
* Check if the table exists, and check if the path exists for managed table
243+
*/
244+
def checkTableOrPathExists(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
245+
if (!ignoreIfExists) {
246+
val db = formatDatabaseName(table.identifier.database.getOrElse(getCurrentDatabase))
247+
val tbl = formatTableName(table.identifier.table)
248+
val tableIdentifier = TableIdentifier(tbl, Some(db))
249+
if (tableExists(tableIdentifier)) {
250+
throw new TableAlreadyExistsException(db = db, table = tbl)
251+
}
252+
// As discussed in SPARK-19583, the default location of a managed table should not exists
253+
if (table.tableType == CatalogTableType.MANAGED) {
254+
val tablePath = new Path(defaultTablePath(tableIdentifier))
255+
val fs = tablePath.getFileSystem(hadoopConf)
256+
if (fs.exists(tablePath)) {
257+
throw new AnalysisException(s"the location('${tablePath.toString}') " +
258+
s"of table('$tableIdentifier') already exists.")
259+
}
260+
}
261+
}
262+
}
241263
// ----------------------------------------------------------------------------
242264
// Tables
243265
// ----------------------------------------------------------------------------
@@ -259,6 +281,8 @@ class SessionCatalog(
259281
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
260282
val table = formatTableName(tableDefinition.identifier.table)
261283
validateName(table)
284+
requireDbExists(db)
285+
checkTableOrPathExists(tableDefinition, ignoreIfExists)
262286

263287
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
264288
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
@@ -272,7 +296,6 @@ class SessionCatalog(
272296
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
273297
}
274298

275-
requireDbExists(db)
276299
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
277300
}
278301

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
164164
assert(actual.tableType === CatalogTableType.EXTERNAL)
165165
}
166166

167-
test("create table when the table already exists") {
168-
val catalog = newBasicCatalog()
169-
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
170-
val table = newTable("tbl1", "db2")
171-
intercept[TableAlreadyExistsException] {
172-
catalog.createTable(table, ignoreIfExists = false)
173-
}
174-
}
175-
176167
test("drop table") {
177168
val catalog = newBasicCatalog()
178169
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ class SessionCatalogSuite extends PlanTest {
6464
assert(!catalog.databaseExists("does_not_exist"))
6565
}
6666

67+
test("create table when the table already exists") {
68+
val catalog = new SessionCatalog(newEmptyCatalog())
69+
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
70+
catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false)
71+
72+
val table = newTable("tbl1", "db1")
73+
intercept[TableAlreadyExistsException] {
74+
catalog.createTable(table, ignoreIfExists = false)
75+
}.getMessage
76+
}
77+
6778
def testInvalidName(func: (String) => Unit) {
6879
// scalastyle:off
6980
// non ascii characters are not allowed in the source code, so we disable the scalastyle.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.net.URI
2121

22-
import org.apache.hadoop.fs.Path
2322

2423
import org.apache.spark.sql._
2524
import org.apache.spark.sql.catalyst.catalog._
@@ -155,6 +154,8 @@ case class CreateDataSourceTableAsSelectCommand(
155154
} else {
156155
table.storage.locationUri
157156
}
157+
158+
sparkSession.sessionState.catalog.checkTableOrPathExists(table, ignoreIfExists = false)
158159
val result = saveDataIntoTable(
159160
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
160161
val newTable = table.copy(
@@ -163,7 +164,9 @@ case class CreateDataSourceTableAsSelectCommand(
163164
// the schema of df). It is important since the nullability may be changed by the relation
164165
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
165166
schema = result.schema)
166-
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
167+
// we have check the table/path exists above before saveDataIntoTable, here we
168+
// set ignoreIfExists to true
169+
sessionState.catalog.createTable(newTable, ignoreIfExists = true)
167170

168171
result match {
169172
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,4 +2216,48 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
22162216
}
22172217
}
22182218
}
2219+
2220+
test("create table for managed datasource table with a created location throw an exception") {
2221+
withTable("t", "t1", "t2", "t3") {
2222+
val warehousePath = spark.sharedState.warehousePath
2223+
val qualifiedwarehousePath = CatalogUtils.URIToString(makeQualifiedPath(warehousePath))
2224+
val tPath = new Path(qualifiedwarehousePath, "t")
2225+
val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf())
2226+
fs.mkdirs(tPath)
2227+
assert(fs.exists(tPath))
2228+
val e = intercept[AnalysisException] {
2229+
spark.sql("CREATE TABLE t(a string) USING parquet")
2230+
}.getMessage
2231+
assert(e.contains(s"the location('${tPath.toString}') of table" +
2232+
s"('`default`.`t`') already exists."))
2233+
// partition table(table path exists)
2234+
val t1Path = new Path(qualifiedwarehousePath, "t1")
2235+
fs.mkdirs(t1Path)
2236+
assert(fs.exists(t1Path))
2237+
val e1 = intercept[AnalysisException] {
2238+
spark.sql("CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(a)")
2239+
}.getMessage
2240+
assert(e1.contains(s"the location('${t1Path.toString}') of table" +
2241+
s"('`default`.`t1`') already exists."))
2242+
2243+
val t2Path = new Path(qualifiedwarehousePath, "t2")
2244+
fs.mkdirs(t2Path)
2245+
assert(fs.exists(t2Path))
2246+
val e2 = intercept[AnalysisException] {
2247+
spark.sql("CREATE TABLE t2 USING parquet AS SELECT 1")
2248+
}.getMessage
2249+
assert(e2.contains(s"the location('${t2Path.toString}') of table" +
2250+
s"('`default`.`t2`') already exists."))
2251+
2252+
val t3Path = new Path(qualifiedwarehousePath, "t3")
2253+
val t3PartPath = new Path(t3Path, "a=1")
2254+
fs.mkdirs(t3PartPath)
2255+
assert(fs.exists(t3PartPath))
2256+
val e3 = intercept[AnalysisException] {
2257+
spark.sql("CREATE TABLE t3 USING parquet PARTITIONED BY(a) AS SELECT 1 a, 2 b")
2258+
}.getMessage
2259+
assert(e3.contains(s"the location('${t3Path.toString}') of table" +
2260+
s"('`default`.`t3`') already exists."))
2261+
}
2262+
}
22192263
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
195195
requireDbExists(db)
196196
verifyTableProperties(tableDefinition)
197197

198-
if (tableExists(db, table) && !ignoreIfExists) {
199-
throw new TableAlreadyExistsException(db = db, table = table)
200-
}
201-
202198
if (tableDefinition.tableType == VIEW) {
203199
client.createTable(tableDefinition, ignoreIfExists)
204200
} else {

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
2020
import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.AnalysisException
2324
import org.apache.spark.sql.catalyst.TableIdentifier
2425
import org.apache.spark.sql.catalyst.catalog._
2526
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -50,6 +51,16 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
5051

5152
import utils._
5253

54+
test("create table when the table already exists") {
55+
val catalog = newBasicCatalog()
56+
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
57+
val table = newTable("tbl1", "db2")
58+
val e = intercept[AnalysisException] {
59+
catalog.createTable(table, ignoreIfExists = false)
60+
}.getMessage
61+
assert(e.contains("AlreadyExistsException(message:Table tbl1 already exists);"))
62+
}
63+
5364
test("list partitions by filter") {
5465
val catalog = newBasicCatalog()
5566
val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT")

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,7 +1364,7 @@ class HiveDDLSuite
13641364
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX
13651365
import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX
13661366

1367-
withTable("tbl") {
1367+
withTable("tbl", "tbl1") {
13681368
sql("CREATE TABLE tbl(a INT) STORED AS parquet")
13691369

13701370
Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix =>
@@ -1379,7 +1379,7 @@ class HiveDDLSuite
13791379
assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
13801380

13811381
val e3 = intercept[AnalysisException] {
1382-
sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
1382+
sql(s"CREATE TABLE tbl1 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
13831383
}
13841384
assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
13851385
}
@@ -1861,4 +1861,49 @@ class HiveDDLSuite
18611861
}
18621862
}
18631863
}
1864+
1865+
1866+
test("create table for managed hive table with a created location throw an exception") {
1867+
withTable("t", "t1", "t2", "t3") {
1868+
val warehousePath = spark.sharedState.warehousePath
1869+
val qualifiedwarehousePath = CatalogUtils.URIToString(makeQualifiedPath(warehousePath))
1870+
val tPath = new Path(qualifiedwarehousePath, "t")
1871+
val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf())
1872+
fs.mkdirs(tPath)
1873+
assert(fs.exists(tPath))
1874+
val e = intercept[AnalysisException] {
1875+
spark.sql("CREATE TABLE t(a string) USING hive")
1876+
}.getMessage
1877+
assert(e.contains(s"the location('${tPath.toString}') of table" +
1878+
s"('`default`.`t`') already exists."))
1879+
// partition table(table path exists)
1880+
val t1Path = new Path(qualifiedwarehousePath, "t1")
1881+
fs.mkdirs(t1Path)
1882+
assert(fs.exists(t1Path))
1883+
val e1 = intercept[AnalysisException] {
1884+
spark.sql("CREATE TABLE t1(a string, b string) USING hive PARTITIONED BY(a)")
1885+
}.getMessage
1886+
assert(e1.contains(s"the location('${t1Path.toString}') of table" +
1887+
s"('`default`.`t1`') already exists."))
1888+
1889+
val t2Path = new Path(qualifiedwarehousePath, "t2")
1890+
fs.mkdirs(t2Path)
1891+
assert(fs.exists(t2Path))
1892+
val e2 = intercept[AnalysisException] {
1893+
spark.sql("CREATE TABLE t2 USING hive AS SELECT 1")
1894+
}.getMessage
1895+
assert(e2.contains(s"the location('${t2Path.toString}') of table" +
1896+
s"('`default`.`t2`') already exists."))
1897+
1898+
val t3Path = new Path(qualifiedwarehousePath, "t3")
1899+
val t3PartPath = new Path(t3Path, "a=1")
1900+
fs.mkdirs(t3PartPath)
1901+
assert(fs.exists(t3PartPath))
1902+
val e3 = intercept[AnalysisException] {
1903+
spark.sql("CREATE TABLE t3 USING hive PARTITIONED BY(a) AS SELECT 1 a, 2 b")
1904+
}.getMessage
1905+
assert(e3.contains(s"the location('${t3Path.toString}') of table" +
1906+
s"('`default`.`t3`') already exists."))
1907+
}
1908+
}
18641909
}

0 commit comments

Comments
 (0)