Skip to content

Commit 52f8bcd

Browse files
committed
[SPARK-19724][SQL]create a managed hive table with an existed default location should throw an exception
1 parent 8f33731 commit 52f8bcd

File tree

3 files changed

+171
-3
lines changed

3 files changed

+171
-3
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,4 +1952,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19521952
}
19531953
}
19541954
}
1955+
1956+
test("CTAS for data source table with a created default location throw an exception") {
1957+
withTable("t", "t1", "t2") {
1958+
val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:")
1959+
val tFile = new File(warehousePath, "t")
1960+
tFile.mkdirs()
1961+
assert(tFile.exists)
1962+
1963+
val e = intercept[AnalysisException] {
1964+
spark.sql(
1965+
s"""
1966+
|CREATE TABLE t
1967+
|USING parquet
1968+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1969+
""".stripMargin)
1970+
}.getMessage
1971+
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))
1972+
1973+
// partition table(table path exists)
1974+
val tFile1 = new File(warehousePath, "t1")
1975+
tFile1.mkdirs()
1976+
assert(tFile1.exists)
1977+
val e1 = intercept[AnalysisException] {
1978+
spark.sql(
1979+
s"""
1980+
|CREATE TABLE t1
1981+
|USING parquet
1982+
|PARTITIONED BY(a, b)
1983+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1984+
""".stripMargin)
1985+
}.getMessage
1986+
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))
1987+
1988+
// partition table(partition path exists)
1989+
val tFile2 = new File(warehousePath, "t2")
1990+
val tPartFile = new File(tFile2, "a=3/b=4")
1991+
tPartFile.mkdirs()
1992+
assert(tPartFile.exists)
1993+
val e2 = intercept[AnalysisException] {
1994+
spark.sql(
1995+
s"""
1996+
|CREATE TABLE t2
1997+
|USING parquet
1998+
|PARTITIONED BY(a, b)
1999+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
2000+
""".stripMargin)
2001+
}.getMessage
2002+
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
2003+
}
2004+
}
19552005
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.execution
1919

2020
import scala.util.control.NonFatal
2121

22+
import org.apache.hadoop.fs.Path
23+
2224
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
2325
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
24-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
26+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
2527
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
2628
import org.apache.spark.sql.execution.command.RunnableCommand
2729

@@ -68,9 +70,22 @@ case class CreateHiveTableAsSelectCommand(
6870
// add the relation into catalog, just in case of failure occurs while data
6971
// processing.
7072
assert(tableDesc.schema.isEmpty)
71-
sparkSession.sessionState.catalog.createTable(
72-
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
7373

74+
// As discussed in SPARK-19583, in CTAS the default location of a managed
75+
// table should not exists
76+
if (mode == SaveMode.ErrorIfExists && tableDesc.tableType == CatalogTableType.MANAGED) {
77+
val hadoopConf = sparkSession.sessionState.newHadoopConf()
78+
val tblLocationPath =
79+
new Path(sparkSession.sessionState.catalog.defaultTablePath(tableIdentifier))
80+
val fs = tblLocationPath.getFileSystem(hadoopConf)
81+
if (fs.exists(tblLocationPath)) {
82+
throw new AnalysisException(s"the location('$tblLocationPath') of table" +
83+
s"('$tableIdentifier') already exists.")
84+
}
85+
}
86+
87+
sparkSession.sessionState.catalog.createTable(
88+
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
7489
try {
7590
sparkSession.sessionState.executePlan(
7691
InsertIntoTable(

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,4 +1587,107 @@ class HiveDDLSuite
15871587
}
15881588
}
15891589
}
1590+
1591+
test("CTAS for data source table with a created default location throw an exception") {
1592+
withTable("t", "t1", "t2") {
1593+
val warehousePath = spark.sharedState.warehousePath
1594+
val tFile = new File(warehousePath, "t")
1595+
tFile.mkdirs()
1596+
assert(tFile.exists)
1597+
1598+
val e = intercept[AnalysisException] {
1599+
spark.sql(
1600+
s"""
1601+
|CREATE TABLE t
1602+
|USING parquet
1603+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1604+
""".stripMargin)
1605+
}.getMessage
1606+
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))
1607+
1608+
// partition table(table path exists)
1609+
val tFile1 = new File(warehousePath, "t1")
1610+
tFile1.mkdirs()
1611+
assert(tFile1.exists)
1612+
val e1 = intercept[AnalysisException] {
1613+
spark.sql(
1614+
s"""
1615+
|CREATE TABLE t1
1616+
|USING parquet
1617+
|PARTITIONED BY(a, b)
1618+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1619+
""".stripMargin)
1620+
}.getMessage
1621+
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))
1622+
1623+
// partition table(partition path exists)
1624+
val tFile2 = new File(warehousePath, "t2")
1625+
val tPartFile = new File(tFile2, "a=3/b=4")
1626+
tPartFile.mkdirs()
1627+
assert(tPartFile.exists)
1628+
val e2 = intercept[AnalysisException] {
1629+
spark.sql(
1630+
s"""
1631+
|CREATE TABLE t2
1632+
|USING parquet
1633+
|PARTITIONED BY(a, b)
1634+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1635+
""".stripMargin)
1636+
}.getMessage
1637+
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
1638+
}
1639+
}
1640+
1641+
test("CTAS for hive table with a created default location throw an exception") {
1642+
withTable("t", "t1", "t2") {
1643+
val warehousePath = spark.sharedState.warehousePath
1644+
val tFile = new File(warehousePath, "t")
1645+
tFile.mkdirs()
1646+
assert(tFile.exists)
1647+
1648+
val e = intercept[AnalysisException] {
1649+
spark.sql(
1650+
s"""
1651+
|CREATE TABLE t
1652+
|USING hive
1653+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1654+
""".stripMargin)
1655+
}.getMessage
1656+
assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" +
1657+
s"('`default`.`t`') already exists."))
1658+
1659+
// partition table(table path exists)
1660+
val tFile1 = new File(warehousePath, "t1")
1661+
tFile1.mkdirs()
1662+
assert(tFile1.exists)
1663+
val e1 = intercept[AnalysisException] {
1664+
spark.sql(
1665+
s"""
1666+
|CREATE TABLE t1
1667+
|USING hive
1668+
|PARTITIONED BY(a, b)
1669+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1670+
""".stripMargin)
1671+
}.getMessage
1672+
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
1673+
s"('`default`.`t1`') already exists."))
1674+
1675+
// partition table(partition path exists)
1676+
val tFile2 = new File(warehousePath, "t2")
1677+
val tPartFile = new File(tFile2, "a=3/b=4")
1678+
tPartFile.mkdirs()
1679+
assert(tPartFile.exists)
1680+
val e2 = intercept[AnalysisException] {
1681+
spark.sql(
1682+
s"""
1683+
|CREATE TABLE t2
1684+
|USING hive
1685+
|PARTITIONED BY(a, b)
1686+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1687+
""".stripMargin)
1688+
}.getMessage
1689+
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
1690+
s"('`default`.`t1`') already exists."))
1691+
}
1692+
}
15901693
}

0 commit comments

Comments
 (0)