Skip to content

Commit 5591a1c

Browse files
committed
init commit
1 parent c70c38e commit 5591a1c

File tree

4 files changed

+127
-5
lines changed

4 files changed

+127
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
2021
import org.apache.spark.sql.catalyst.expressions._
2122
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2223
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
@@ -59,7 +60,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
5960
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
6061
val prunedFsRelation =
6162
fsRelation.copy(location = prunedFileIndex)(sparkSession)
62-
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
63+
val withStats = logicalRelation.catalogTable.map(_.copy(
64+
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
65+
val prunedLogicalRelation = logicalRelation.copy(
66+
relation = prunedFsRelation, catalogTable = withStats)
6367

6468
// Keep partition-pruning predicates so that they are visible in physical planning
6569
val filterExpression = filters.reduceLeft(And)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
2020
import org.apache.spark.annotation.{Experimental, InterfaceStability}
2121
import org.apache.spark.sql._
2222
import org.apache.spark.sql.catalyst.analysis.Analyzer
23+
import org.apache.spark.sql.catalyst.optimizer.Optimizer
2324
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2425
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.execution.SparkPlanner
26+
import org.apache.spark.sql.execution.{SparkOptimizer, SparkPlanner}
2627
import org.apache.spark.sql.execution.datasources._
2728
import org.apache.spark.sql.hive.client.HiveClient
2829
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
@@ -87,6 +88,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
8788
customCheckRules
8889
}
8990

91+
/**
92+
* Logical query plan optimizer that takes into account Hive.
93+
*/
94+
override lazy val optimizer: Optimizer =
95+
new SparkOptimizer(catalog, conf, experimentalMethods) {
96+
override def postHocOptimizationBatches: Seq[Batch] = Seq(
97+
Batch("Determine stats of partitionedTable", Once,
98+
new DeterminePartitionedTableStats(session))
99+
)
100+
101+
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
102+
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
103+
}
104+
90105
/**
91106
* Planner that takes into account Hive-specific strategies.
92107
*/

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,22 @@ import java.io.IOException
2121
import java.util.Locale
2222

2323
import org.apache.hadoop.fs.{FileSystem, Path}
24-
import org.apache.hadoop.hive.common.StatsSetupConst
2524

2625
import org.apache.spark.sql._
2726
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
2827
import org.apache.spark.sql.catalyst.expressions._
2928
import org.apache.spark.sql.catalyst.planning._
30-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
29+
import org.apache.spark.sql.catalyst.plans.logical._
3130
import org.apache.spark.sql.catalyst.rules.Rule
3231
import org.apache.spark.sql.execution._
3332
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
3433
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
3534
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
35+
import org.apache.spark.sql.hive.client.HiveClientImpl
3636
import org.apache.spark.sql.hive.execution._
3737
import org.apache.spark.sql.hive.orc.OrcFileFormat
3838
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3939

40-
4140
/**
4241
* Determine the database, serde/format and schema of the Hive serde table, according to the storage
4342
* properties.
@@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
139138
}
140139
}
141140

141+
case class DeterminePartitionedTableStats(
142+
session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper {
143+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
144+
case filter @ Filter(condition, relation: CatalogRelation)
145+
if DDLUtils.isHiveTable(relation.tableMeta) && relation.isPartitioned =>
146+
val predicates = splitConjunctivePredicates(condition)
147+
val normalizedFilters = predicates.map { e =>
148+
e transform {
149+
case a: AttributeReference =>
150+
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
151+
}
152+
}
153+
val partitionSet = AttributeSet(relation.partitionCols)
154+
val pruningPredicates = normalizedFilters.filter { predicate =>
155+
!predicate.references.isEmpty &&
156+
predicate.references.subsetOf(partitionSet)
157+
}
158+
if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled &&
159+
session.sessionState.conf.metastorePartitionPruning) {
160+
val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter(
161+
relation.tableMeta.database,
162+
relation.tableMeta.identifier.table,
163+
pruningPredicates,
164+
session.sessionState.conf.sessionLocalTimeZone)
165+
val hiveTable = HiveClientImpl.toHiveTable(relation.tableMeta)
166+
val partitions = prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveTable))
167+
val sizeInBytes = try {
168+
val hadoopConf = session.sessionState.newHadoopConf()
169+
partitions.map { partition =>
170+
val fs: FileSystem = partition.getDataLocation.getFileSystem(hadoopConf)
171+
fs.getContentSummary(partition.getDataLocation).getLength
172+
}.sum
173+
} catch {
174+
case e: IOException =>
175+
logWarning("Failed to get table size from hdfs.", e)
176+
session.sessionState.conf.defaultSizeInBytes
177+
}
178+
val withStats = relation.tableMeta.copy(
179+
stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
180+
val prunedCatalogRelation = relation.copy(tableMeta = withStats)
181+
val filterExpression = predicates.reduceLeft(And)
182+
Filter(filterExpression, prunedCatalogRelation)
183+
} else {
184+
filter
185+
}
186+
}
187+
}
188+
142189
/**
143190
* Replaces generic operations with specific variants that are designed to work with Hive.
144191
*

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,4 +759,60 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
759759
}
760760

761761
}
762+
763+
test("auto converts to broadcast join by size estimate of scanned partitions " +
764+
"for partitioned table") {
765+
withTempView("tempTbl", "largeTbl", "partTbl") {
766+
spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2")
767+
.createOrReplaceTempView("tempTbl")
768+
spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2").
769+
createOrReplaceTempView("largeTbl")
770+
sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " +
771+
"PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile")
772+
for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) {
773+
sql(
774+
s"""
775+
|INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2')
776+
|select col1, col2 from tempTbl
777+
""".stripMargin)
778+
}
779+
val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " +
780+
"and partTbl.part1 = 'a' and partTbl.part2 = 1)"
781+
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true",
782+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") {
783+
784+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") {
785+
val broadcastJoins =
786+
sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
787+
assert(broadcastJoins.nonEmpty)
788+
}
789+
790+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "false") {
791+
val broadcastJoins =
792+
sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
793+
assert(broadcastJoins.isEmpty)
794+
}
795+
}
796+
797+
sql("CREATE TABLE partTbl_parquet (col1 INT, col2 STRING) " +
798+
"PARTITIONED BY (part1 STRING, part2 INT) STORED AS parquet")
799+
for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) {
800+
sql(
801+
s"""
802+
|INSERT OVERWRITE TABLE partTbl_parquet PARTITION (part1='$part1',part2='$part2')
803+
|select col1, col2 from tempTbl
804+
""".stripMargin)
805+
}
806+
807+
val query2 =
808+
"select * from largeTbl join partTbl_parquet on (largeTbl.col1 = partTbl_parquet.col1 " +
809+
"and partTbl_parquet.part1 = 'a' and partTbl_parquet.part2 = 1)"
810+
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true",
811+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "12000") {
812+
val broadcastJoins =
813+
sql(query2).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
814+
assert(broadcastJoins.nonEmpty)
815+
}
816+
}
817+
}
762818
}

0 commit comments

Comments
 (0)