@@ -27,9 +27,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
2727import org .apache .hadoop .hive .serde2 .objectinspector .SettableStructObjectInspector
2828import org .apache .hadoop .hive .serde2 .typeinfo .{StructTypeInfo , TypeInfoUtils }
2929import org .apache .hadoop .io .{NullWritable , Writable }
30- import org .apache .hadoop .mapred .{InputFormat => MapRedInputFormat , JobConf , OutputFormat => MapRedOutputFormat , RecordWriter , Reporter }
30+ import org .apache .hadoop .mapred .{FileInputFormat , InputFormat => MapRedInputFormat , JobConf , OutputFormat => MapRedOutputFormat , RecordWriter , Reporter }
3131import org .apache .hadoop .mapreduce .{Job , TaskAttemptContext }
32- import org .apache .hadoop .mapreduce .lib .input .FileInputFormat
3332
3433import org .apache .spark .Logging
3534import org .apache .spark .broadcast .Broadcast
@@ -42,6 +41,7 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
4241import org .apache .spark .sql .sources .{Filter , _ }
4342import org .apache .spark .sql .types .StructType
4443import org .apache .spark .util .SerializableConfiguration
44+ import org .apache .spark .util .Utils
4545
4646private [sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
4747
@@ -207,7 +207,9 @@ private[sql] class OrcRelation(
207207 inputPaths : Array [FileStatus ],
208208 broadcastedConf : Broadcast [SerializableConfiguration ]): RDD [InternalRow ] = {
209209 val output = StructType (requiredColumns.map(dataSchema(_))).toAttributes
210- OrcTableScan (output, this , filters, inputPaths).execute()
210+ Utils .withDummyCallSite(sqlContext.sparkContext) {
211+ OrcTableScan (output, this , filters, inputPaths, broadcastedConf).execute()
212+ }
211213 }
212214
213215 override def prepareJobForWrite (job : Job ): BucketedOutputWriterFactory = {
@@ -237,21 +239,13 @@ private[orc] case class OrcTableScan(
237239 attributes : Seq [Attribute ],
238240 @ transient relation : OrcRelation ,
239241 filters : Array [Filter ],
240- @ transient inputPaths : Array [FileStatus ])
242+ @ transient inputPaths : Array [FileStatus ],
243+ broadcastedConf : Broadcast [SerializableConfiguration ])
241244 extends Logging
242245 with HiveInspectors {
243246
244247 @ transient private val sqlContext = relation.sqlContext
245248
246- private def addColumnIds (
247- output : Seq [Attribute ],
248- relation : OrcRelation ,
249- conf : Configuration ): Unit = {
250- val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer )
251- val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
252- HiveShim .appendReadColumns(conf, sortedIds, sortedNames)
253- }
254-
255249 // Transform all given raw `Writable`s into `InternalRow`s.
256250 private def fillObject (
257251 path : String ,
@@ -293,47 +287,65 @@ private[orc] case class OrcTableScan(
293287 }
294288
295289 def execute (): RDD [InternalRow ] = {
296- val job = Job .getInstance(sqlContext.sparkContext.hadoopConfiguration)
297- val conf = job.getConfiguration
298-
299- // Tries to push down filters if ORC filter push-down is enabled
300- if (sqlContext.conf.orcFilterPushDown) {
301- OrcFilters .createFilter(filters).foreach { f =>
302- conf.set(OrcTableScan .SARG_PUSHDOWN , f.toKryo)
303- conf.setBoolean(ConfVars .HIVEOPTINDEXFILTER .varname, true )
304- }
305- }
306-
307- // Sets requested columns
308- addColumnIds(attributes, relation, conf)
309290
310291 if (inputPaths.isEmpty) {
311292 // the input path probably be pruned, return an empty RDD.
312293 return sqlContext.sparkContext.emptyRDD[InternalRow ]
313294 }
314- FileInputFormat .setInputPaths(job, inputPaths.map(_.getPath): _* )
295+
296+ val ids = attributes.map(a => relation.dataSchema.fieldIndex(a.name): Integer )
297+ val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
298+
299+ // Get the paths as fileStatus is not serializable
300+ val setInputPaths =
301+ OrcTableScan .setupConfigs(inputPaths.map(_.getPath.toString),
302+ sortedIds, sortedNames, sqlContext.conf.orcFilterPushDown, filters) _
303+
315304
316305 val inputFormatClass =
317306 classOf [OrcInputFormat ]
318307 .asInstanceOf [Class [_ <: MapRedInputFormat [NullWritable , Writable ]]]
319308
320309 val rdd = sqlContext.sparkContext.hadoopRDD(
321- conf.asInstanceOf [JobConf ],
310+ broadcastedConf,
311+ Some (setInputPaths),
322312 inputFormatClass,
323313 classOf [NullWritable ],
324314 classOf [Writable ]
325315 ).asInstanceOf [HadoopRDD [NullWritable , Writable ]]
326316
327- val wrappedConf = new SerializableConfiguration (conf)
328-
329317 rdd.mapPartitionsWithInputSplit { case (split : OrcSplit , iterator) =>
330318 val writableIterator = iterator.map(_._2)
331- fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
319+ fillObject(split.getPath.toString, broadcastedConf.value.value,
320+ writableIterator, attributes)
332321 }
333322 }
334323}
335324
336325private [orc] object OrcTableScan {
337326 // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
338327 private [orc] val SARG_PUSHDOWN = " sarg.pushdown"
328+
329+ private [orc] def setupConfigs (
330+ inputFiles : Array [String ],
331+ ids : Seq [Integer ],
332+ names : Seq [String ],
333+ filterPushDown : Boolean ,
334+ orcFilters : Array [Filter ])(job : JobConf ): Unit = {
335+
336+ HiveShim .appendReadColumns(job, ids, names)
337+
338+ if (filterPushDown) {
339+ OrcFilters .createFilter(orcFilters).foreach { f =>
340+ job.set(OrcTableScan .SARG_PUSHDOWN , f.toKryo)
341+ job.setBoolean(ConfVars .HIVEOPTINDEXFILTER .varname, true )
342+ }
343+ }
344+
345+ if (inputFiles.nonEmpty) {
346+ // Set up the input paths
347+ val inputPaths = inputFiles.map(i => new Path (i))
348+ FileInputFormat .setInputPaths(job, inputPaths : _* )
349+ }
350+ }
339351}
0 commit comments