Skip to content

Commit 95aa931

Browse files
committed
[SPARK-19833][SQL]remove SQLConf.HIVE_VERIFY_PARTITION_PATH, always return empty when the location does not exists
1 parent 207067e commit 95aa931

File tree

3 files changed

+54
-68
lines changed

3 files changed

+54
-68
lines changed

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,6 @@ object SQLConf {
264264
.booleanConf
265265
.createWithDefault(false)
266266

267-
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
268-
.doc("When true, check all the partition paths under the table\'s root directory " +
269-
"when reading data stored in HDFS.")
270-
.booleanConf
271-
.createWithDefault(false)
272-
273267
val HIVE_METASTORE_PARTITION_PRUNING =
274268
buildConf("spark.sql.hive.metastorePartitionPruning")
275269
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
@@ -768,8 +762,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
768762

769763
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
770764

771-
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
772-
773765
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
774766

775767
def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -159,36 +159,32 @@ class HadoopTableReader(
159159
def verifyPartitionPath(
160160
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
161161
Map[HivePartition, Class[_ <: Deserializer]] = {
162-
if (!sparkSession.sessionState.conf.verifyPartitionPath) {
163-
partitionToDeserializer
164-
} else {
165-
var existPathSet = collection.mutable.Set[String]()
166-
var pathPatternSet = collection.mutable.Set[String]()
167-
partitionToDeserializer.filter {
168-
case (partition, partDeserializer) =>
169-
def updateExistPathSetByPathPattern(pathPatternStr: String) {
170-
val pathPattern = new Path(pathPatternStr)
171-
val fs = pathPattern.getFileSystem(hadoopConf)
172-
val matches = fs.globStatus(pathPattern)
173-
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
174-
}
175-
// convert /demo/data/year/month/day to /demo/data/*/*/*/
176-
def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
177-
var path = tempPath
178-
for (i <- (1 to parNum)) path = path.getParent
179-
val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
180-
path.toString + tails
181-
}
182-
183-
val partPath = partition.getDataLocation
184-
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
185-
var pathPatternStr = getPathPatternByPath(partNum, partPath)
186-
if (!pathPatternSet.contains(pathPatternStr)) {
187-
pathPatternSet += pathPatternStr
188-
updateExistPathSetByPathPattern(pathPatternStr)
189-
}
190-
existPathSet.contains(partPath.toString)
191-
}
162+
var existPathSet = collection.mutable.Set[String]()
163+
var pathPatternSet = collection.mutable.Set[String]()
164+
partitionToDeserializer.filter {
165+
case (partition, partDeserializer) =>
166+
def updateExistPathSetByPathPattern(pathPatternStr: String) {
167+
val pathPattern = new Path(pathPatternStr)
168+
val fs = pathPattern.getFileSystem(hadoopConf)
169+
val matches = fs.globStatus(pathPattern)
170+
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
171+
}
172+
// convert /demo/data/year/month/day to /demo/data/*/*/*/
173+
def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
174+
var path = tempPath
175+
for (i <- (1 to parNum)) path = path.getParent
176+
val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
177+
path.toString + tails
178+
}
179+
180+
val partPath = partition.getDataLocation
181+
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
182+
var pathPatternStr = getPathPatternByPath(partNum, partPath)
183+
if (!pathPatternSet.contains(pathPatternStr)) {
184+
pathPatternSet += pathPatternStr
185+
updateExistPathSetByPathPattern(pathPatternStr)
186+
}
187+
existPathSet.contains(partPath.toString)
192188
}
193189
}
194190

sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,40 +32,38 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
3232
import spark.implicits._
3333

3434
test("SPARK-5068: query data when path doesn't exist") {
35-
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) {
36-
val testData = sparkContext.parallelize(
37-
(1 to 10).map(i => TestData(i, i.toString))).toDF()
38-
testData.createOrReplaceTempView("testData")
35+
val testData = sparkContext.parallelize(
36+
(1 to 10).map(i => TestData(i, i.toString))).toDF()
37+
testData.createOrReplaceTempView("testData")
3938

40-
val tmpDir = Files.createTempDir()
41-
// create the table for test
42-
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
43-
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
44-
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
45-
"SELECT key,value FROM testData")
46-
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
47-
"SELECT key,value FROM testData")
48-
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
49-
"SELECT key,value FROM testData")
50-
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
51-
"SELECT key,value FROM testData")
39+
val tmpDir = Files.createTempDir()
40+
// create the table for test
41+
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
42+
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
43+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
44+
"SELECT key,value FROM testData")
45+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
46+
"SELECT key,value FROM testData")
47+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
48+
"SELECT key,value FROM testData")
49+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
50+
"SELECT key,value FROM testData")
5251

53-
// test for the exist path
54-
checkAnswer(sql("select key,value from table_with_partition"),
55-
testData.toDF.collect ++ testData.toDF.collect
56-
++ testData.toDF.collect ++ testData.toDF.collect)
52+
// test for the exist path
53+
checkAnswer(sql("select key,value from table_with_partition"),
54+
testData.toDF.collect ++ testData.toDF.collect
55+
++ testData.toDF.collect ++ testData.toDF.collect)
5756

58-
// delete the path of one partition
59-
tmpDir.listFiles
60-
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
61-
.foreach { f => Utils.deleteRecursively(f) }
57+
// delete the path of one partition
58+
tmpDir.listFiles
59+
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
60+
.foreach { f => Utils.deleteRecursively(f) }
6261

63-
// test for after delete the path
64-
checkAnswer(sql("select key,value from table_with_partition"),
65-
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
62+
// test for after delete the path
63+
checkAnswer(sql("select key,value from table_with_partition"),
64+
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
6665

67-
sql("DROP TABLE IF EXISTS table_with_partition")
68-
sql("DROP TABLE IF EXISTS createAndInsertTest")
69-
}
66+
sql("DROP TABLE IF EXISTS table_with_partition")
67+
sql("DROP TABLE IF EXISTS createAndInsertTest")
7068
}
7169
}

0 commit comments

Comments
 (0)