Skip to content

Commit 4da7a22

Browse files
committed
SPARK-12948. [SQL]. Consider reducing size of broadcasts in OrcRelation
1 parent 2b5d11f commit 4da7a22

File tree

3 files changed

+67
-32
lines changed

3 files changed

+67
-32
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
967967
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
968968
}
969969

970+
/**
971+
* Get an RDD for a Hadoop-readable dataset from the Hadoop JobConf.
972+
*
973+
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it.
974+
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf
975+
* that HadoopRDD creates.
976+
* @param inputFormatClass Class of the InputFormat
977+
* @param keyClass Class of the keys
978+
* @param valueClass Class of the values
979+
* @param minPartitions Minimum number of Hadoop Splits to generate.
980+
*/
981+
def hadoopRDD[K, V](
982+
broadcastedConf: Broadcast[SerializableConfiguration],
983+
initLocalJobConfFuncOpt: Option[JobConf => Unit],
984+
inputFormatClass: Class[_ <: InputFormat[K, V]],
985+
keyClass: Class[K],
986+
valueClass: Class[V],
987+
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
988+
assertNotStopped()
989+
new HadoopRDD(this, broadcastedConf, initLocalJobConfFuncOpt,
990+
inputFormatClass, keyClass, valueClass, minPartitions)
991+
}
992+
970993
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
971994
*
972995
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ class DAGScheduler(
10061006
case stage: ResultStage =>
10071007
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
10081008
}
1009-
1009+
logDebug(s"Size of broadcasted task binary: ${taskBinaryBytes.length}")
10101010
taskBinary = sc.broadcast(taskBinaryBytes)
10111011
} catch {
10121012
// In the case of a failure during serialization, abort the stage.

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
2727
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
2828
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
2929
import org.apache.hadoop.io.{NullWritable, Writable}
30-
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
30+
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
3131
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
32-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3332

3433
import org.apache.spark.Logging
3534
import org.apache.spark.broadcast.Broadcast
@@ -42,6 +41,7 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
4241
import org.apache.spark.sql.sources.{Filter, _}
4342
import org.apache.spark.sql.types.StructType
4443
import org.apache.spark.util.SerializableConfiguration
44+
import org.apache.spark.util.Utils
4545

4646
private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
4747

@@ -207,7 +207,9 @@ private[sql] class OrcRelation(
207207
inputPaths: Array[FileStatus],
208208
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
209209
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
210-
OrcTableScan(output, this, filters, inputPaths).execute()
210+
Utils.withDummyCallSite(sqlContext.sparkContext) {
211+
OrcTableScan(output, this, filters, inputPaths, broadcastedConf).execute()
212+
}
211213
}
212214

213215
override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
@@ -237,21 +239,13 @@ private[orc] case class OrcTableScan(
237239
attributes: Seq[Attribute],
238240
@transient relation: OrcRelation,
239241
filters: Array[Filter],
240-
@transient inputPaths: Array[FileStatus])
242+
@transient inputPaths: Array[FileStatus],
243+
broadcastedConf: Broadcast[SerializableConfiguration])
241244
extends Logging
242245
with HiveInspectors {
243246

244247
@transient private val sqlContext = relation.sqlContext
245248

246-
private def addColumnIds(
247-
output: Seq[Attribute],
248-
relation: OrcRelation,
249-
conf: Configuration): Unit = {
250-
val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer)
251-
val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
252-
HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
253-
}
254-
255249
// Transform all given raw `Writable`s into `InternalRow`s.
256250
private def fillObject(
257251
path: String,
@@ -293,47 +287,65 @@ private[orc] case class OrcTableScan(
293287
}
294288

295289
def execute(): RDD[InternalRow] = {
296-
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
297-
val conf = job.getConfiguration
298-
299-
// Tries to push down filters if ORC filter push-down is enabled
300-
if (sqlContext.conf.orcFilterPushDown) {
301-
OrcFilters.createFilter(filters).foreach { f =>
302-
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
303-
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
304-
}
305-
}
306-
307-
// Sets requested columns
308-
addColumnIds(attributes, relation, conf)
309290

310291
if (inputPaths.isEmpty) {
311292
// the input path probably be pruned, return an empty RDD.
312293
return sqlContext.sparkContext.emptyRDD[InternalRow]
313294
}
314-
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
295+
296+
val ids = attributes.map(a => relation.dataSchema.fieldIndex(a.name): Integer)
297+
val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
298+
299+
// Get the paths as fileStatus is not serializable
300+
val setInputPaths =
301+
OrcTableScan.setupConfigs(inputPaths.map(_.getPath.toString),
302+
sortedIds, sortedNames, sqlContext.conf.orcFilterPushDown, filters) _
303+
315304

316305
val inputFormatClass =
317306
classOf[OrcInputFormat]
318307
.asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]
319308

320309
val rdd = sqlContext.sparkContext.hadoopRDD(
321-
conf.asInstanceOf[JobConf],
310+
broadcastedConf,
311+
Some(setInputPaths),
322312
inputFormatClass,
323313
classOf[NullWritable],
324314
classOf[Writable]
325315
).asInstanceOf[HadoopRDD[NullWritable, Writable]]
326316

327-
val wrappedConf = new SerializableConfiguration(conf)
328-
329317
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
330318
val writableIterator = iterator.map(_._2)
331-
fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
319+
fillObject(split.getPath.toString, broadcastedConf.value.value,
320+
writableIterator, attributes)
332321
}
333322
}
334323
}
335324

336325
private[orc] object OrcTableScan {
337326
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
338327
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
328+
329+
private[orc] def setupConfigs(
330+
inputFiles: Array[String],
331+
ids: Seq[Integer],
332+
names: Seq[String],
333+
filterPushDown: Boolean,
334+
orcFilters: Array[Filter])(job: JobConf): Unit = {
335+
336+
HiveShim.appendReadColumns(job, ids, names)
337+
338+
if (filterPushDown) {
339+
OrcFilters.createFilter(orcFilters).foreach { f =>
340+
job.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
341+
job.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
342+
}
343+
}
344+
345+
if (inputFiles.nonEmpty) {
346+
// Set up the input paths
347+
val inputPaths = inputFiles.map(i => new Path(i))
348+
FileInputFormat.setInputPaths(job, inputPaths: _*)
349+
}
350+
}
339351
}

0 commit comments

Comments
 (0)