Skip to content

Commit c7b56a4

Browse files
committed
[SPARK-32480] Support insert overwrite to move data to trash
1 parent a8b5688 commit c7b56a4

File tree

6 files changed

+35
-18
lines changed

6 files changed

+35
-18
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ private[spark] object Utils extends Logging {
270270
}
271271

272272
/**
273-
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true
273+
* Move data to trash if 'spark.sql.trash.enabled' is true
274274
*/
275275
def moveToTrashIfEnabled(
276276
fs: FileSystem,

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2722,13 +2722,13 @@ object SQLConf {
27222722
.booleanConf
27232723
.createWithDefault(false)
27242724

2725-
val TRUNCATE_TRASH_ENABLED =
2726-
buildConf("spark.sql.truncate.trash.enabled")
2727-
.doc("This configuration decides when truncating table, whether data files will be moved " +
2728-
"to trash directory or deleted permanently. The trash retention time is controlled by " +
2729-
"fs.trash.interval, and in default, the server side configuration value takes " +
2730-
"precedence over the client-side one. Note that if fs.trash.interval is non-positive, " +
2731-
"this will be a no-op and log a warning message.")
2725+
val TRASH_ENABLED =
2726+
buildConf("spark.sql.trash.enabled")
2727+
.doc("This configuration decides when truncating table and insert overwrite, whether " +
2728+
"data files will be moved to trash directory or deleted permanently. The trash " +
2729+
"retention time is controlled by fs.trash.interval, and in default, the server side " +
2730+
"configuration value takes precedence over the client-side one. Note that if " +
2731+
"fs.trash.interval is non-positive, this will be a no-op and log a warning message.")
27322732
.version("3.1.0")
27332733
.booleanConf
27342734
.createWithDefault(false)
@@ -3345,7 +3345,7 @@ class SQLConf extends Serializable with Logging {
33453345

33463346
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
33473347

3348-
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
3348+
def trashEnabled: Boolean = getConf(SQLConf.TRASH_ENABLED)
33493349

33503350
/** ********************** SQLConf functionality methods ************ */
33513351

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ case class TruncateTableCommand(
490490
}
491491
val hadoopConf = spark.sessionState.newHadoopConf()
492492
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
493-
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
493+
val isTrashEnabled = SQLConf.get.trashEnabled
494494
locations.foreach { location =>
495495
if (location.isDefined) {
496496
val path = new Path(location.get)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3105,7 +3105,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31053105
test("SPARK-32481 Move data to trash on truncate table if enabled") {
31063106
val trashIntervalKey = "fs.trash.interval"
31073107
withTable("tab1") {
3108-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3108+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
31093109
sql("CREATE TABLE tab1 (col INT) USING parquet")
31103110
sql("INSERT INTO tab1 SELECT 1")
31113111
// scalastyle:off hadoopconfiguration
@@ -3133,7 +3133,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31333133
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
31343134
val trashIntervalKey = "fs.trash.interval"
31353135
withTable("tab1") {
3136-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3136+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
31373137
sql("CREATE TABLE tab1 (col INT) USING parquet")
31383138
sql("INSERT INTO tab1 SELECT 1")
31393139
// scalastyle:off hadoopconfiguration
@@ -3159,7 +3159,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31593159

31603160
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
31613161
withTable("tab1") {
3162-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
3162+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "false") {
31633163
sql("CREATE TABLE tab1 (col INT) USING parquet")
31643164
sql("INSERT INTO tab1 SELECT 1")
31653165
val hadoopConf = spark.sessionState.newHadoopConf()

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.hive.client.HiveClientImpl
3333
import org.apache.spark.sql.util.SchemaUtils
34+
import org.apache.spark.util.Utils
3435

3536
/**
3637
* Command for writing the results of `query` to file system.
@@ -108,8 +109,11 @@ case class InsertIntoHiveDirCommand(
108109
outputLocation = tmpPath.toString)
109110

110111
if (overwrite && fs.exists(writeToPath)) {
112+
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
111113
fs.listStatus(writeToPath).foreach { existFile =>
112-
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
114+
if (Option(existFile.getPath) != createdTempDir) {
115+
Utils.moveToTrashIfEnabled(fs, existFile.getPath, isTrashEnabled, hadoopConf)
116+
}
113117
}
114118
}
115119

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
2020
import java.util.Locale
2121

2222
import org.apache.hadoop.conf.Configuration
23-
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.{Path, Trash}
2424
import org.apache.hadoop.hive.ql.ErrorMsg
2525
import org.apache.hadoop.hive.ql.plan.TableDesc
2626

@@ -309,9 +309,22 @@ case class InsertIntoHiveTable(
309309
partitionPath.foreach { path =>
310310
val fs = path.getFileSystem(hadoopConf)
311311
if (fs.exists(path)) {
312-
if (!fs.delete(path, true)) {
313-
throw new RuntimeException(
314-
s"Cannot remove partition directory '$path'")
312+
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
313+
if (!isTrashEnabled) {
314+
if (!fs.delete(path, true)) {
315+
throw new RuntimeException(
316+
s"Cannot remove partition directory '$path'")
317+
}
318+
} else {
319+
logDebug(s"will move data ${partitionPath.toString} to trash")
320+
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
321+
if (!isSuccess) {
322+
logWarning(s"Failed to move data ${partitionPath.toString} to trash")
323+
if (!fs.delete(path, true)) {
324+
throw new RuntimeException(
325+
s"Cannot remove partition directory '$path'")
326+
}
327+
}
315328
}
316329
// Don't let Hive do overwrite operation since it is slower.
317330
doHiveOverwrite = false

0 commit comments

Comments
 (0)