Skip to content

Commit 1dc12b3

Browse files
committed
fix.
1 parent 871f611 commit 1dc12b3

File tree

4 files changed

+109
-5
lines changed

4 files changed

+109
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,22 @@ class InMemoryCatalog(
229229
if (tableExists(db, table)) {
230230
val tableMeta = getTable(db, table)
231231
if (tableMeta.tableType == CatalogTableType.MANAGED) {
232+
// Delete the data/directory for each partition
233+
val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location)
234+
locationAllParts.foreach { loc =>
235+
val partitionPath = new Path(loc)
236+
try {
237+
val fs = partitionPath.getFileSystem(hadoopConfig)
238+
fs.delete(partitionPath, true)
239+
} catch {
240+
case e: IOException =>
241+
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
242+
}
243+
}
232244
assert(tableMeta.storage.locationUri.isDefined,
233245
"Managed table should always have table location, as we will assign a default location " +
234246
"to it if it doesn't have one.")
247+
// Delete the data/directory of the table
235248
val dir = new Path(tableMeta.location)
236249
try {
237250
val fs = dir.getFileSystem(hadoopConfig)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
346346
assert(new Path(partitionLocation) == defaultPartitionLocation)
347347
}
348348

349+
test("create/drop partitions in managed tables with location") {
350+
val catalog = newBasicCatalog()
351+
val table = CatalogTable(
352+
identifier = TableIdentifier("tbl", Some("db1")),
353+
tableType = CatalogTableType.MANAGED,
354+
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
355+
schema = new StructType()
356+
.add("col1", "int")
357+
.add("col2", "string")
358+
.add("partCol1", "int")
359+
.add("partCol2", "string"),
360+
provider = Some("hive"),
361+
partitionColumnNames = Seq("partCol1", "partCol2"))
362+
catalog.createTable(table, ignoreIfExists = false)
363+
364+
val newLocationPart1 = newUriForDatabase()
365+
val newLocationPart2 = newUriForDatabase()
366+
367+
val partition1 =
368+
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
369+
storageFormat.copy(locationUri = Some(newLocationPart1)))
370+
val partition2 =
371+
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
372+
storageFormat.copy(locationUri = Some(newLocationPart2)))
373+
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
374+
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)
375+
376+
assert(exists(newLocationPart1))
377+
assert(exists(newLocationPart2))
378+
379+
// the corresponding directory is dropped.
380+
catalog.dropPartitions("db1", "tbl", Seq(partition1.spec),
381+
ignoreIfNotExists = false, purge = false, retainData = false)
382+
assert(!exists(newLocationPart1))
383+
384+
// all the remaining directories are dropped.
385+
catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false)
386+
assert(!exists(newLocationPart2))
387+
}
388+
349389
test("list partition names") {
350390
val catalog = newBasicCatalog()
351391
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,13 +400,12 @@ case class AlterTableSerDePropertiesCommand(
400400
/**
401401
* Add Partition in ALTER TABLE: add the table partitions.
402402
*
403-
* 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
404-
* EXCEPT that it is ILLEGAL to specify a LOCATION clause.
405403
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
406404
*
407405
* The syntax of this command is:
408406
* {{{
409-
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
407+
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
408+
* PARTITION spec2 [LOCATION 'loc2']
410409
* }}}
411410
*/
412411
case class AlterTableAddPartitionCommand(

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,52 @@ class HiveDDLSuite
199199
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
200200
}
201201

202+
test("add/drop partition with location - managed table") {
203+
val tab = "tab_with_partitions"
204+
withTempDir { tmpDir =>
205+
val basePath = new File(tmpDir.getCanonicalPath)
206+
val part1Path = new File(basePath + "/part1")
207+
val part2Path = new File(basePath + "/part2")
208+
val dirSet = part1Path :: part2Path :: Nil
209+
210+
// Before data insertion, all the directory are empty
211+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
212+
213+
withTable(tab) {
214+
sql(
215+
s"""
216+
|CREATE TABLE $tab (key INT, value STRING)
217+
|PARTITIONED BY (ds STRING, hr STRING)
218+
""".stripMargin)
219+
sql(
220+
s"""
221+
|ALTER TABLE $tab ADD
222+
|PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path'
223+
|PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path'
224+
""".stripMargin)
225+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
226+
227+
sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'")
228+
sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'")
229+
// add partition will not delete the data
230+
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
231+
checkAnswer(
232+
spark.table(tab),
233+
Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil
234+
)
235+
236+
sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)")
237+
// drop partition will delete the data
238+
assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty)
239+
assert(part2Path.listFiles.nonEmpty)
240+
241+
sql(s"DROP TABLE $tab")
242+
// drop table will delete the data of the managed table
243+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
244+
}
245+
}
246+
}
247+
202248
test("add/drop partitions - external table") {
203249
val catalog = spark.sessionState.catalog
204250
withTempDir { tmpDir =>
@@ -257,9 +303,15 @@ class HiveDDLSuite
257303
// drop partition will not delete the data of external table
258304
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
259305

260-
sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')")
306+
sql(
307+
s"""
308+
|ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')
309+
|PARTITION (ds='2008-04-08', hr=11)
310+
""".stripMargin)
261311
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
262-
Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11")))
312+
Set(Map("ds" -> "2008-04-08", "hr" -> "11"),
313+
Map("ds" -> "2008-04-08", "hr" -> "12"),
314+
Map("ds" -> "2008-04-09", "hr" -> "11")))
263315
// add partition will not delete the data
264316
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
265317

0 commit comments

Comments
 (0)