Skip to content

Commit 2303887

Browse files
Mingjie Tanggatorsmile
authored andcommitted
[SPARK-18372][SQL][BRANCH-1.6] Staging directory fail to be removed
## What changes were proposed in this pull request? This fix is related to be bug: https://issues.apache.org/jira/browse/SPARK-18372 . The insertIntoHiveTable would generate a .staging directory, but this directory fail to be removed in the end. This is backport from spark 2.0.x code, and is related to PR apache#12770 ## How was this patch tested? manual tests Author: Mingjie Tang <mtanghortonworks.com> Author: Mingjie Tang <[email protected]> Author: Mingjie Tang <[email protected]> Closes apache#15819 from merlintang/branch-1.6.
1 parent 70f271b commit 2303887

File tree

2 files changed

+118
-8
lines changed

2 files changed

+118
-8
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,33 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20+
import java.io.IOException
21+
import java.net.URI
22+
import java.text.SimpleDateFormat
2023
import java.util
24+
import java.util.{Date, Random}
2125

22-
import scala.collection.JavaConverters._
26+
import org.apache.hadoop.conf.Configuration
27+
import org.apache.hadoop.fs.{FileSystem, Path}
28+
import org.apache.hadoop.hive.common.FileUtils
29+
30+
import scala.util.control.NonFatal
2331

32+
import scala.collection.JavaConverters._
2433
import org.apache.hadoop.hive.conf.HiveConf
2534
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
35+
import org.apache.hadoop.hive.ql.exec.TaskRunner
2636
import org.apache.hadoop.hive.ql.plan.TableDesc
2737
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
2838
import org.apache.hadoop.hive.serde2.Serializer
2939
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
3040
import org.apache.hadoop.hive.serde2.objectinspector._
3141
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
32-
3342
import org.apache.spark.rdd.RDD
3443
import org.apache.spark.sql.Row
3544
import org.apache.spark.sql.catalyst.InternalRow
3645
import org.apache.spark.sql.catalyst.expressions.Attribute
37-
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
46+
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
3847
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
3948
import org.apache.spark.sql.hive._
4049
import org.apache.spark.sql.types.DataType
@@ -54,6 +63,63 @@ case class InsertIntoHiveTable(
5463
@transient private lazy val hiveContext = new Context(sc.hiveconf)
5564
@transient private lazy val catalog = sc.catalog
5665

66+
@transient var createdTempDir: Option[Path] = None
67+
val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
68+
69+
private def executionId: String = {
70+
val rand: Random = new Random
71+
val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
72+
val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
73+
executionId
74+
}
75+
76+
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
77+
val inputPathUri: URI = inputPath.toUri
78+
val inputPathName: String = inputPathUri.getPath
79+
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
80+
val stagingPathName: String =
81+
if (inputPathName.indexOf(stagingDir) == -1) {
82+
new Path(inputPathName, stagingDir).toString
83+
} else {
84+
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
85+
}
86+
val dir: Path =
87+
fs.makeQualified(
88+
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
89+
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
90+
try {
91+
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
92+
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
93+
}
94+
createdTempDir = Some(dir)
95+
fs.deleteOnExit(dir)
96+
}
97+
catch {
98+
case e: IOException =>
99+
throw new RuntimeException(
100+
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
101+
102+
}
103+
return dir
104+
}
105+
106+
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
107+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
108+
}
109+
110+
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
111+
val extURI: URI = path.toUri
112+
if (extURI.getScheme == "viewfs") {
113+
getExtTmpPathRelTo(path.getParent, hadoopConf)
114+
} else {
115+
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
116+
}
117+
}
118+
119+
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
120+
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
121+
}
122+
57123
private def newSerializer(tableDesc: TableDesc): Serializer = {
58124
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
59125
serializer.initialize(null, tableDesc.getProperties)
@@ -129,7 +195,9 @@ case class InsertIntoHiveTable(
129195
// instances within the closure, since Serializer is not serializable while TableDesc is.
130196
val tableDesc = table.tableDesc
131197
val tableLocation = table.hiveQlTable.getDataLocation
132-
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
198+
val jobConf = new JobConf(sc.hiveconf)
199+
val tmpLocation = getExternalTmpPath(tableLocation, jobConf)
200+
133201
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
134202
val isCompressed = sc.hiveconf.getBoolean(
135203
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
@@ -175,7 +243,6 @@ case class InsertIntoHiveTable(
175243
}
176244
}
177245

178-
val jobConf = new JobConf(sc.hiveconf)
179246
val jobConfSer = new SerializableJobConf(jobConf)
180247

181248
// When speculation is on and output committer class name contains "Direct", we should warn
@@ -260,6 +327,15 @@ case class InsertIntoHiveTable(
260327
holdDDLTime)
261328
}
262329

330+
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
331+
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
332+
try {
333+
createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) }
334+
} catch {
335+
case NonFatal(e) =>
336+
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
337+
}
338+
263339
// Invalidate the cache.
264340
sqlContext.cacheManager.invalidateCache(table)
265341

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client
2020
import java.io.File
2121

2222
import org.apache.hadoop.util.VersionInfo
23-
23+
import org.apache.spark.sql.Row
2424
import org.apache.spark.sql.hive.HiveContext
2525
import org.apache.spark.{Logging, SparkFunSuite}
26-
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
26+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
2727
import org.apache.spark.sql.catalyst.util.quietly
2828
import org.apache.spark.sql.types.IntegerType
2929
import org.apache.spark.tags.ExtendedHiveTest
3030
import org.apache.spark.util.Utils
31+
import org.apache.spark.sql.test.SQLTestUtils
32+
import org.apache.spark.sql.hive.test.TestHiveSingleton
33+
3134

3235
/**
3336
* A simple set of tests that call the methods of a hive ClientInterface, loading different version
@@ -36,7 +39,7 @@ import org.apache.spark.util.Utils
3639
* is not fully tested.
3740
*/
3841
@ExtendedHiveTest
39-
class VersionsSuite extends SparkFunSuite with Logging {
42+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
4043

4144
// In order to speed up test execution during development or in Jenkins, you can specify the path
4245
// of an existing Ivy cache:
@@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging {
216219
"as 'COMPACT' WITH DEFERRED REBUILD")
217220
client.reset()
218221
}
222+
223+
test(s"$version: CREATE TABLE AS SELECT") {
224+
withTable("tbl") {
225+
sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
226+
assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1)))
227+
}
228+
}
229+
230+
test(s"$version: Delete the temporary staging directory and files after each insert") {
231+
withTempDir { tmpDir =>
232+
withTable("tab", "tbl") {
233+
sqlContext.sql(
234+
s"""
235+
|CREATE TABLE tab(c1 string)
236+
|location '${tmpDir.toURI.toString}'
237+
""".stripMargin)
238+
239+
sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
240+
sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ")
241+
242+
def listFiles(path: File): List[String] = {
243+
val dir = path.listFiles()
244+
val folders = dir.filter(_.isDirectory).toList
245+
val filePaths = dir.map(_.getName).toList
246+
folders.flatMap(listFiles) ++: filePaths
247+
}
248+
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
249+
assert(listFiles(tmpDir).sorted == expectedFiles)
250+
}
251+
}
252+
}
219253
}
220254
}

0 commit comments

Comments
 (0)