Skip to content

Commit 5cbe999

Browse files
committed
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration doesn't take effect on tables with partition field(s)
Fix the similar issue of orc compression
1 parent 42aca3d commit 5cbe999

File tree

2 files changed

+82
-55
lines changed

2 files changed

+82
-55
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ case class InsertIntoHiveTable(
102102
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
103103

104104
tableDesc.getOutputFileFormatClassName match {
105-
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
105+
case formatName if formatName.endsWith("ParquetOutputFormat") =>
106106
val parquetCompression = sparkSession.sessionState.conf.parquetCompressionCodec
107107
hadoopConf.set("parquet.compression", parquetCompression)
108+
case formatName if formatName.endsWith("OrcOutputFormat") =>
109+
val orcCompression = sparkSession.sessionState.conf.orcCompressionCodec.toUpperCase match {
110+
case "UNCOMPRESSED" => "NONE"
111+
case _@x => x
112+
}
113+
hadoopConf.set("orc.compress", orcCompression)
108114
case _ =>
109115
}
110116

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 75 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -731,72 +731,93 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
731731

732732
test("[SPARK-21786] The 'spark.sql.parquet.compression.codec' " +
733733
"configuration doesn't take effect on tables with partition field(s)") {
734-
withTempDir { tmpDir =>
735-
withTempView("table_source") {
736-
(0 until 10000).toDF("a").createOrReplaceTempView("table_source")
734+
val tableWithPartition = "table_with_partition"
735+
val tableNoPartition = "table_no_partition"
737736

738-
val tableWithPartition = "table_with_partition"
739-
val tableNoPartition = "table_no_partition"
740-
withTable(tableWithPartition, tableNoPartition) {
741-
sql(
742-
s"""
737+
def insertOverwriteTable(tableName: String, paramName: String, codec: String,
738+
isPartitioned: Boolean): Unit = {
739+
withSQLConf(paramName -> codec) {
740+
sql(
741+
s"""
742+
|INSERT OVERWRITE TABLE $tableName
743+
|${if (isPartitioned) "partition (p=10000)" else "" }
744+
|SELECT * from table_source
745+
""".stripMargin)
746+
}
747+
}
748+
749+
def getDirFiles(file: File): List[File] = {
750+
if (!file.exists()) Nil
751+
else if (file.isFile) List(file)
752+
else {
753+
file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
754+
.groupBy(_.isFile).flatMap {
755+
case (isFile, files) if isFile => files.toList
756+
case (_, dirs) => dirs.flatMap(getDirFiles)
757+
}.toList
758+
}
759+
}
760+
761+
def getTableSize(tmpDir: File, tableName: String, paramName: String, codec: String,
762+
isPartitioned: Boolean = false): Long = {
763+
insertOverwriteTable(tableName, paramName, codec, isPartitioned)
764+
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
765+
val dir = new File(path)
766+
val files = getDirFiles(dir).filter(_.getName.startsWith("part-"))
767+
files.map(_.length()).sum
768+
}
769+
770+
def checkCompressionCodec(format: String)(f: File => Unit): Unit = {
771+
withTempDir { tmpDir =>
772+
withTempView("table_source") {
773+
(0 until 10000).toDF("a").createOrReplaceTempView("table_source")
774+
775+
withTable(tableWithPartition, tableNoPartition) {
776+
sql(
777+
s"""
743778
|CREATE TABLE $tableNoPartition(a int)
744-
|STORED AS PARQUET
779+
|STORED AS $format
745780
|LOCATION '${tmpDir.toURI.toString.stripSuffix("/")}/$tableNoPartition'
746781
""".stripMargin)
747-
sql(
748-
s"""
782+
sql(
783+
s"""
749784
|CREATE TABLE $tableWithPartition(a int)
750785
|PARTITIONED BY (p int)
751-
|STORED AS PARQUET
786+
|STORED AS $format
752787
|LOCATION '${tmpDir.toURI.toString.stripSuffix("/")}/$tableWithPartition'
753788
""".stripMargin)
754789

755-
def insertOverwriteTable(tableName: String, codec: String,
756-
isPartitioned: Boolean): Unit = {
757-
withSQLConf("spark.sql.parquet.compression.codec" -> codec) {
758-
sql(
759-
s"""
760-
|INSERT OVERWRITE TABLE $tableName
761-
|${if (isPartitioned) "partition (p=10000)" else "" }
762-
|SELECT * from table_source
763-
""".stripMargin)
764-
}
765-
}
766-
767-
def getDirFiles(file: File): List[File] = {
768-
if (!file.exists()) Nil
769-
else if (file.isFile) List(file)
770-
else {
771-
file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
772-
.groupBy(_.isFile).flatMap {
773-
case (isFile, files) if isFile => files.toList
774-
case (_, dirs) => dirs.flatMap(getDirFiles)
775-
}.toList
776-
}
777-
}
778-
779-
def getTableSize(tableName: String, codec: String,
780-
isPartitioned: Boolean = false): Long = {
781-
insertOverwriteTable(tableName, codec, isPartitioned)
782-
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
783-
val dir = new File(path)
784-
val files = getDirFiles(dir).filter(_.getName.startsWith("part-"))
785-
files.map(_.length()).sum
790+
f(tmpDir)
786791
}
787-
788-
// In fact, partitioned and unpartitioned table meta information is slightly different,
789-
// and partitioned tables are slightly larger, but the differences are not very large.
790-
// Think less than 1024Byte
791-
val maxDiff = 1024
792-
assert(getTableSize(tableWithPartition, "uncompressed", true)
793-
- getTableSize(tableNoPartition, "uncompressed") < maxDiff)
794-
assert(getTableSize(tableWithPartition, "gzip", true)
795-
- getTableSize(tableNoPartition, "gzip") < maxDiff)
796-
assert(getTableSize(tableWithPartition, "uncompressed", true)
797-
- getTableSize(tableWithPartition, "gzip", true) > maxDiff)
798792
}
799793
}
800794
}
795+
796+
val parquetCompression = "spark.sql.parquet.compression.codec"
797+
checkCompressionCodec("PARQUET") { tmpDir =>
798+
// In fact, partitioned and unpartitioned table meta information is slightly different,
799+
// and partitioned tables are slightly larger, but the differences are not very large.
800+
// Think less than 1024Byte
801+
val maxDiff = 1024
802+
assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, "uncompressed", true)
803+
- getTableSize(tmpDir, tableNoPartition, parquetCompression, "uncompressed") < maxDiff)
804+
assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, "gzip", true)
805+
- getTableSize(tmpDir, tableNoPartition, parquetCompression, "gzip") < maxDiff)
806+
assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, "uncompressed", true)
807+
- getTableSize(tmpDir, tableWithPartition, parquetCompression, "gzip", true) > maxDiff)
808+
}
809+
810+
val orcCompression = "spark.sql.orc.compression.codec"
811+
checkCompressionCodec("ORC") { tmpDir =>
812+
val maxDiff = 1024
813+
assert(getTableSize(tmpDir, tableWithPartition, orcCompression, "none", true)
814+
- getTableSize(tmpDir, tableNoPartition, orcCompression, "none") < maxDiff)
815+
assert(getTableSize(tmpDir, tableWithPartition, orcCompression, "uncompressed", true)
816+
== getTableSize(tmpDir, tableNoPartition, orcCompression, "none"))
817+
assert(getTableSize(tmpDir, tableWithPartition, orcCompression, "zlib", true)
818+
- getTableSize(tmpDir, tableNoPartition, orcCompression, "zlib") < maxDiff)
819+
assert(getTableSize(tmpDir, tableWithPartition, orcCompression, "none", true)
820+
- getTableSize(tmpDir, tableWithPartition, orcCompression, "zlib", true) > maxDiff)
821+
}
801822
}
802823
}

0 commit comments

Comments
 (0)