@@ -731,72 +731,93 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
731731
732732 test(" [SPARK-21786] The 'spark.sql.parquet.compression.codec' " +
733733 " configuration doesn't take effect on tables with partition field(s)" ) {
734- withTempDir { tmpDir =>
735- withTempView(" table_source" ) {
736- (0 until 10000 ).toDF(" a" ).createOrReplaceTempView(" table_source" )
734+ val tableWithPartition = " table_with_partition"
735+ val tableNoPartition = " table_no_partition"
737736
738- val tableWithPartition = " table_with_partition"
739- val tableNoPartition = " table_no_partition"
740- withTable(tableWithPartition, tableNoPartition) {
741- sql(
742- s """
737+ def insertOverwriteTable (tableName : String , paramName : String , codec : String ,
738+ isPartitioned : Boolean ): Unit = {
739+ withSQLConf(paramName -> codec) {
740+ sql(
741+ s """
742+ |INSERT OVERWRITE TABLE $tableName
743+ | ${if (isPartitioned) " partition (p=10000)" else " " }
744+ |SELECT * from table_source
745+ """ .stripMargin)
746+ }
747+ }
748+
749+ def getDirFiles (file : File ): List [File ] = {
750+ if (! file.exists()) Nil
751+ else if (file.isFile) List (file)
752+ else {
753+ file.listFiles().filterNot(_.getName.startsWith(" .hive-staging" ))
754+ .groupBy(_.isFile).flatMap {
755+ case (isFile, files) if isFile => files.toList
756+ case (_, dirs) => dirs.flatMap(getDirFiles)
757+ }.toList
758+ }
759+ }
760+
761+ def getTableSize (tmpDir : File , tableName : String , paramName : String , codec : String ,
762+ isPartitioned : Boolean = false ): Long = {
763+ insertOverwriteTable(tableName, paramName, codec, isPartitioned)
764+ val path = s " ${tmpDir.getPath.stripSuffix(" /" )}/ $tableName"
765+ val dir = new File (path)
766+ val files = getDirFiles(dir).filter(_.getName.startsWith(" part-" ))
767+ files.map(_.length()).sum
768+ }
769+
770+ def checkCompressionCodec (format : String )(f : File => Unit ): Unit = {
771+ withTempDir { tmpDir =>
772+ withTempView(" table_source" ) {
773+ (0 until 10000 ).toDF(" a" ).createOrReplaceTempView(" table_source" )
774+
775+ withTable(tableWithPartition, tableNoPartition) {
776+ sql(
777+ s """
743778 |CREATE TABLE $tableNoPartition(a int)
744- |STORED AS PARQUET
779+ |STORED AS $format
745780 |LOCATION ' ${tmpDir.toURI.toString.stripSuffix(" /" )}/ $tableNoPartition'
746781 """ .stripMargin)
747- sql(
748- s """
782+ sql(
783+ s """
749784 |CREATE TABLE $tableWithPartition(a int)
750785 |PARTITIONED BY (p int)
751- |STORED AS PARQUET
786+ |STORED AS $format
752787 |LOCATION ' ${tmpDir.toURI.toString.stripSuffix(" /" )}/ $tableWithPartition'
753788 """ .stripMargin)
754789
755- def insertOverwriteTable (tableName : String , codec : String ,
756- isPartitioned : Boolean ): Unit = {
757- withSQLConf(" spark.sql.parquet.compression.codec" -> codec) {
758- sql(
759- s """
760- |INSERT OVERWRITE TABLE $tableName
761- | ${if (isPartitioned) " partition (p=10000)" else " " }
762- |SELECT * from table_source
763- """ .stripMargin)
764- }
765- }
766-
767- def getDirFiles (file : File ): List [File ] = {
768- if (! file.exists()) Nil
769- else if (file.isFile) List (file)
770- else {
771- file.listFiles().filterNot(_.getName.startsWith(" .hive-staging" ))
772- .groupBy(_.isFile).flatMap {
773- case (isFile, files) if isFile => files.toList
774- case (_, dirs) => dirs.flatMap(getDirFiles)
775- }.toList
776- }
777- }
778-
779- def getTableSize (tableName : String , codec : String ,
780- isPartitioned : Boolean = false ): Long = {
781- insertOverwriteTable(tableName, codec, isPartitioned)
782- val path = s " ${tmpDir.getPath.stripSuffix(" /" )}/ $tableName"
783- val dir = new File (path)
784- val files = getDirFiles(dir).filter(_.getName.startsWith(" part-" ))
785- files.map(_.length()).sum
790+ f(tmpDir)
786791 }
787-
788- // In fact, partitioned and unpartitioned table meta information is slightly different,
789- // and partitioned tables are slightly larger, but the differences are not very large.
790- // Think less than 1024Byte
791- val maxDiff = 1024
792- assert(getTableSize(tableWithPartition, " uncompressed" , true )
793- - getTableSize(tableNoPartition, " uncompressed" ) < maxDiff)
794- assert(getTableSize(tableWithPartition, " gzip" , true )
795- - getTableSize(tableNoPartition, " gzip" ) < maxDiff)
796- assert(getTableSize(tableWithPartition, " uncompressed" , true )
797- - getTableSize(tableWithPartition, " gzip" , true ) > maxDiff)
798792 }
799793 }
800794 }
795+
796+ val parquetCompression = " spark.sql.parquet.compression.codec"
797+ checkCompressionCodec(" PARQUET" ) { tmpDir =>
798+ // In fact, partitioned and unpartitioned table meta information is slightly different,
799+ // and partitioned tables are slightly larger, but the differences are not very large.
800+ // Think less than 1024Byte
801+ val maxDiff = 1024
802+ assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, " uncompressed" , true )
803+ - getTableSize(tmpDir, tableNoPartition, parquetCompression, " uncompressed" ) < maxDiff)
804+ assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, " gzip" , true )
805+ - getTableSize(tmpDir, tableNoPartition, parquetCompression, " gzip" ) < maxDiff)
806+ assert(getTableSize(tmpDir, tableWithPartition, parquetCompression, " uncompressed" , true )
807+ - getTableSize(tmpDir, tableWithPartition, parquetCompression, " gzip" , true ) > maxDiff)
808+ }
809+
810+ val orcCompression = " spark.sql.orc.compression.codec"
811+ checkCompressionCodec(" ORC" ) { tmpDir =>
812+ val maxDiff = 1024
813+ assert(getTableSize(tmpDir, tableWithPartition, orcCompression, " none" , true )
814+ - getTableSize(tmpDir, tableNoPartition, orcCompression, " none" ) < maxDiff)
815+ assert(getTableSize(tmpDir, tableWithPartition, orcCompression, " uncompressed" , true )
816+ == getTableSize(tmpDir, tableNoPartition, orcCompression, " none" ))
817+ assert(getTableSize(tmpDir, tableWithPartition, orcCompression, " zlib" , true )
818+ - getTableSize(tmpDir, tableNoPartition, orcCompression, " zlib" ) < maxDiff)
819+ assert(getTableSize(tmpDir, tableWithPartition, orcCompression, " none" , true )
820+ - getTableSize(tmpDir, tableWithPartition, orcCompression, " zlib" , true ) > maxDiff)
821+ }
801822 }
802823}
0 commit comments