Skip to content

Commit 6be846b

Browse files
committed
[SPARK-32480] Support insert overwrite to move data to trash
1 parent 065f173 commit 6be846b

File tree

6 files changed

+37
-19
lines changed

6 files changed

+37
-19
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ private[spark] object Utils extends Logging {
270270
}
271271

272272
/**
273-
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
273+
* Move data to trash if 'spark.sql.trash.enabled' is true, else
274274
* delete the data permanently. If move data to trash failed fallback to hard deletion.
275275
*/
276276
def moveToTrashOrDelete(

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,14 +2732,14 @@ object SQLConf {
27322732
.booleanConf
27332733
.createWithDefault(false)
27342734

2735-
val TRUNCATE_TRASH_ENABLED =
2736-
buildConf("spark.sql.truncate.trash.enabled")
2737-
.doc("This configuration decides when truncating table, whether data files will be moved " +
2738-
"to trash directory or deleted permanently. The trash retention time is controlled by " +
2739-
"'fs.trash.interval', and in default, the server side configuration value takes " +
2740-
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
2741-
"this will be a no-op and log a warning message. If the data fails to be moved to " +
2742-
"trash, Spark will turn to delete it permanently.")
2735+
val TRASH_ENABLED =
2736+
buildConf("spark.sql.trash.enabled")
2737+
.doc("This configuration decides when truncating table and insert overwrite, whether data " +
2738+
"files will be moved to trash directory or deleted permanently. The trash retention " +
2739+
"time is controlled by 'fs.trash.interval', and in default, the server side " +
2740+
"configuration value takes precedence over the client-side one. Note that if " +
2741+
"'fs.trash.interval' is non-positive, this will be a no-op and log a warning message. " +
2742+
"If the data fails to be moved to trash, Spark will turn to delete it permanently.")
27432743
.version("3.1.0")
27442744
.booleanConf
27452745
.createWithDefault(false)
@@ -3362,7 +3362,7 @@ class SQLConf extends Serializable with Logging {
33623362

33633363
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
33643364

3365-
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
3365+
def trashEnabled: Boolean = getConf(SQLConf.TRASH_ENABLED)
33663366

33673367
/** ********************** SQLConf functionality methods ************ */
33683368

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ case class TruncateTableCommand(
490490
}
491491
val hadoopConf = spark.sessionState.newHadoopConf()
492492
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
493-
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
493+
val isTrashEnabled = SQLConf.get.trashEnabled
494494
locations.foreach { location =>
495495
if (location.isDefined) {
496496
val path = new Path(location.get)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3105,7 +3105,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31053105
test("SPARK-32481 Move data to trash on truncate table if enabled") {
31063106
val trashIntervalKey = "fs.trash.interval"
31073107
withTable("tab1") {
3108-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3108+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
31093109
sql("CREATE TABLE tab1 (col INT) USING parquet")
31103110
sql("INSERT INTO tab1 SELECT 1")
31113111
// scalastyle:off hadoopconfiguration
@@ -3134,7 +3134,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31343134
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
31353135
val trashIntervalKey = "fs.trash.interval"
31363136
withTable("tab1") {
3137-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3137+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
31383138
sql("CREATE TABLE tab1 (col INT) USING parquet")
31393139
sql("INSERT INTO tab1 SELECT 1")
31403140
// scalastyle:off hadoopconfiguration
@@ -3161,7 +3161,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31613161

31623162
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
31633163
withTable("tab1") {
3164-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
3164+
withSQLConf(SQLConf.TRASH_ENABLED.key -> "false") {
31653165
sql("CREATE TABLE tab1 (col INT) USING parquet")
31663166
sql("INSERT INTO tab1 SELECT 1")
31673167
val hadoopConf = spark.sessionState.newHadoopConf()

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.hive.client.HiveClientImpl
3333
import org.apache.spark.sql.util.SchemaUtils
34+
import org.apache.spark.util.Utils
3435

3536
/**
3637
* Command for writing the results of `query` to file system.
@@ -108,8 +109,11 @@ case class InsertIntoHiveDirCommand(
108109
outputLocation = tmpPath.toString)
109110

110111
if (overwrite && fs.exists(writeToPath)) {
112+
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
111113
fs.listStatus(writeToPath).foreach { existFile =>
112-
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
114+
if (Option(existFile.getPath) != createdTempDir) {
115+
Utils.moveToTrashOrDelete(fs, existFile.getPath, isTrashEnabled, hadoopConf)
116+
}
113117
}
114118
}
115119

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
2020
import java.util.Locale
2121

2222
import org.apache.hadoop.conf.Configuration
23-
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.{Path, Trash}
2424
import org.apache.hadoop.hive.ql.ErrorMsg
2525
import org.apache.hadoop.hive.ql.plan.TableDesc
2626

@@ -309,9 +309,23 @@ case class InsertIntoHiveTable(
309309
partitionPath.foreach { path =>
310310
val fs = path.getFileSystem(hadoopConf)
311311
if (fs.exists(path)) {
312-
if (!fs.delete(path, true)) {
313-
throw new RuntimeException(
314-
s"Cannot remove partition directory '$path'")
312+
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
313+
if (!isTrashEnabled) {
314+
if (!fs.delete(path, true)) {
315+
throw new RuntimeException(
316+
s"Cannot remove partition directory '$path'")
317+
}
318+
} else {
319+
logDebug(s"Try to move data ${path.toString} to trash")
320+
val isSuccess = Trash.moveToAppropriateTrash(fs, path, hadoopConf)
321+
if (!isSuccess) {
322+
logWarning(s"Failed to move data ${path.toString} to trash " +
323+
"fallback to hard deletion")
324+
if (!fs.delete(path, true)) {
325+
throw new RuntimeException(
326+
s"Cannot remove partition directory '$path'")
327+
}
328+
}
315329
}
316330
// Don't let Hive do overwrite operation since it is slower.
317331
doHiveOverwrite = false

0 commit comments

Comments
 (0)