1717
1818package org .apache .spark .sql .hive .execution
1919
20+ import java .io .IOException
21+ import java .net .URI
22+ import java .text .SimpleDateFormat
2023import java .util
24+ import java .util .{Date , Random }
2125
22- import scala .collection .JavaConverters ._
26+ import org .apache .hadoop .conf .Configuration
27+ import org .apache .hadoop .fs .{FileSystem , Path }
28+ import org .apache .hadoop .hive .common .FileUtils
29+
30+ import scala .util .control .NonFatal
2331
32+ import scala .collection .JavaConverters ._
2433import org .apache .hadoop .hive .conf .HiveConf
2534import org .apache .hadoop .hive .conf .HiveConf .ConfVars
35+ import org .apache .hadoop .hive .ql .exec .TaskRunner
2636import org .apache .hadoop .hive .ql .plan .TableDesc
2737import org .apache .hadoop .hive .ql .{Context , ErrorMsg }
2838import org .apache .hadoop .hive .serde2 .Serializer
2939import org .apache .hadoop .hive .serde2 .objectinspector .ObjectInspectorUtils .ObjectInspectorCopyOption
3040import org .apache .hadoop .hive .serde2 .objectinspector ._
3141import org .apache .hadoop .mapred .{FileOutputCommitter , FileOutputFormat , JobConf }
32-
3342import org .apache .spark .rdd .RDD
3443import org .apache .spark .sql .Row
3544import org .apache .spark .sql .catalyst .InternalRow
3645import org .apache .spark .sql .catalyst .expressions .Attribute
37- import org .apache .spark .sql .execution .{UnaryNode , SparkPlan }
46+ import org .apache .spark .sql .execution .{SparkPlan , UnaryNode }
3847import org .apache .spark .sql .hive .HiveShim .{ShimFileSinkDesc => FileSinkDesc }
3948import org .apache .spark .sql .hive ._
4049import org .apache .spark .sql .types .DataType
@@ -54,6 +63,63 @@ case class InsertIntoHiveTable(
5463 @ transient private lazy val hiveContext = new Context (sc.hiveconf)
5564 @ transient private lazy val catalog = sc.catalog
5665
66+ @ transient var createdTempDir : Option [Path ] = None
67+ val stagingDir = new HiveConf ().getVar(HiveConf .ConfVars .STAGINGDIR )
68+
69+ private def executionId : String = {
70+ val rand : Random = new Random
71+ val format : SimpleDateFormat = new SimpleDateFormat (" yyyy-MM-dd_HH-mm-ss_SSS" )
72+ val executionId : String = " hive_" + format.format(new Date ) + " _" + Math .abs(rand.nextLong)
73+ executionId
74+ }
75+
76+ private def getStagingDir (inputPath : Path , hadoopConf : Configuration ): Path = {
77+ val inputPathUri : URI = inputPath.toUri
78+ val inputPathName : String = inputPathUri.getPath
79+ val fs : FileSystem = inputPath.getFileSystem(hadoopConf)
80+ val stagingPathName : String =
81+ if (inputPathName.indexOf(stagingDir) == - 1 ) {
82+ new Path (inputPathName, stagingDir).toString
83+ } else {
84+ inputPathName.substring(0 , inputPathName.indexOf(stagingDir) + stagingDir.length)
85+ }
86+ val dir : Path =
87+ fs.makeQualified(
88+ new Path (stagingPathName + " _" + executionId + " -" + TaskRunner .getTaskRunnerID))
89+ logDebug(" Created staging dir = " + dir + " for path = " + inputPath)
90+ try {
91+ if (! FileUtils .mkdir(fs, dir, true , hadoopConf)) {
92+ throw new IllegalStateException (" Cannot create staging directory '" + dir.toString + " '" )
93+ }
94+ createdTempDir = Some (dir)
95+ fs.deleteOnExit(dir)
96+ }
97+ catch {
98+ case e : IOException =>
99+ throw new RuntimeException (
100+ " Cannot create staging directory '" + dir.toString + " ': " + e.getMessage, e)
101+
102+ }
103+ return dir
104+ }
105+
106+ private def getExternalScratchDir (extURI : URI , hadoopConf : Configuration ): Path = {
107+ getStagingDir(new Path (extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
108+ }
109+
110+ def getExternalTmpPath (path : Path , hadoopConf : Configuration ): Path = {
111+ val extURI : URI = path.toUri
112+ if (extURI.getScheme == " viewfs" ) {
113+ getExtTmpPathRelTo(path.getParent, hadoopConf)
114+ } else {
115+ new Path (getExternalScratchDir(extURI, hadoopConf), " -ext-10000" )
116+ }
117+ }
118+
119+ def getExtTmpPathRelTo (path : Path , hadoopConf : Configuration ): Path = {
120+ new Path (getStagingDir(path, hadoopConf), " -ext-10000" ) // Hive uses 10000
121+ }
122+
57123 private def newSerializer (tableDesc : TableDesc ): Serializer = {
58124 val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf [Serializer ]
59125 serializer.initialize(null , tableDesc.getProperties)
@@ -129,7 +195,9 @@ case class InsertIntoHiveTable(
129195 // instances within the closure, since Serializer is not serializable while TableDesc is.
130196 val tableDesc = table.tableDesc
131197 val tableLocation = table.hiveQlTable.getDataLocation
132- val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
198+ val jobConf = new JobConf (sc.hiveconf)
199+ val tmpLocation = getExternalTmpPath(tableLocation, jobConf)
200+
133201 val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
134202 val isCompressed = sc.hiveconf.getBoolean(
135203 ConfVars .COMPRESSRESULT .varname, ConfVars .COMPRESSRESULT .defaultBoolVal)
@@ -175,7 +243,6 @@ case class InsertIntoHiveTable(
175243 }
176244 }
177245
178- val jobConf = new JobConf (sc.hiveconf)
179246 val jobConfSer = new SerializableJobConf (jobConf)
180247
181248 // When speculation is on and output committer class name contains "Direct", we should warn
@@ -260,6 +327,15 @@ case class InsertIntoHiveTable(
260327 holdDDLTime)
261328 }
262329
330+ // Attempt to delete the staging directory and the inclusive files. If failed, the files are
331+ // expected to be dropped at the normal termination of VM since deleteOnExit is used.
332+ try {
333+ createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true ) }
334+ } catch {
335+ case NonFatal (e) =>
336+ logWarning(s " Unable to delete staging directory: $stagingDir. \n " + e)
337+ }
338+
263339 // Invalidate the cache.
264340 sqlContext.cacheManager.invalidateCache(table)
265341
0 commit comments