Skip to content

Commit c3f0140

Browse files
committed
Revert codes
1 parent da879a6 commit c3f0140

File tree

4 files changed

+30
-82
lines changed

4 files changed

+30
-82
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2525

2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.annotation.DeveloperApi
28-
import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils}
28+
import org.apache.spark.util.Utils
2929

3030
/**
3131
* :: DeveloperApi ::
@@ -53,14 +53,10 @@ private[spark] object CompressionCodec {
5353
|| codec.isInstanceOf[LZ4CompressionCodec])
5454
}
5555

56-
/** Maps the short versions of compression codec names to fully-qualified class names. */
57-
private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper {
58-
override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName)
59-
override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName)
60-
override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName)
61-
}
62-
63-
private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap
56+
private val shortCompressionCodecNames = Map(
57+
"lz4" -> classOf[LZ4CompressionCodec].getName,
58+
"lzf" -> classOf[LZFCompressionCodec].getName,
59+
"snappy" -> classOf[SnappyCompressionCodec].getName)
6460

6561
def getCodecName(conf: SparkConf): String = {
6662
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
@@ -71,7 +67,7 @@ private[spark] object CompressionCodec {
7167
}
7268

7369
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
74-
val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName)
70+
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
7571
val codec = try {
7672
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
7773
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
@@ -88,18 +84,18 @@ private[spark] object CompressionCodec {
8884
* If it is already a short name, just return it.
8985
*/
9086
def getShortName(codecName: String): String = {
91-
if (shortCompressionCodecMap.contains(codecName)) {
87+
if (shortCompressionCodecNames.contains(codecName)) {
9288
codecName
9389
} else {
94-
shortCompressionCodecMap
90+
shortCompressionCodecNames
9591
.collectFirst { case (k, v) if v == codecName => k }
9692
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
9793
}
9894
}
9995

10096
val FALLBACK_COMPRESSION_CODEC = "snappy"
10197
val DEFAULT_COMPRESSION_CODEC = "lz4"
102-
val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq
98+
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
10399
}
104100

105101
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -60,51 +60,6 @@ private[spark] object CallSite {
6060
val empty = CallSite("", "")
6161
}
6262

63-
/** An utility class to map short compression codec names to qualified ones. */
64-
private[spark] class ShortCompressionCodecNameMapper {
65-
66-
def get(codecName: String): Option[String] = codecName.toLowerCase match {
67-
case "none" => none
68-
case "uncompressed" => uncompressed
69-
case "bzip2" => bzip2
70-
case "deflate" => deflate
71-
case "gzip" => gzip
72-
case "lzo" => lzo
73-
case "lz4" => lz4
74-
case "lzf" => lzf
75-
case "snappy" => snappy
76-
case _ => None
77-
}
78-
79-
def getAsMap: Map[String, String] = {
80-
Seq(
81-
("none", none),
82-
("uncompressed", uncompressed),
83-
("bzip2", bzip2),
84-
("deflate", deflate),
85-
("gzip", gzip),
86-
("lzo", lzo),
87-
("lz4", lz4),
88-
("lzf", lzf),
89-
("snappy", snappy)
90-
).flatMap { case (shortCodecName, codecName) =>
91-
if (codecName.isDefined) Some(shortCodecName, codecName.get) else None
92-
}.toMap
93-
}
94-
95-
// To support short codec names, derived classes need to override the methods below that return
96-
// corresponding qualified codec names.
97-
def none: Option[String] = None
98-
def uncompressed: Option[String] = None
99-
def bzip2: Option[String] = None
100-
def deflate: Option[String] = None
101-
def gzip: Option[String] = None
102-
def lzo: Option[String] = None
103-
def lz4: Option[String] = None
104-
def lzf: Option[String] = None
105-
def snappy: Option[String] = None
106-
}
107-
10863
/**
10964
* Various utility methods used by Spark.
11065
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,30 @@ import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.io.SequenceFile.CompressionType
2222
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
2323

24-
import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils}
24+
import org.apache.spark.util.Utils
2525

2626
private[datasources] object CompressionCodecs {
27-
28-
/** Maps the short versions of compression codec names to fully-qualified class names. */
29-
private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper {
30-
override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName)
31-
override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName)
32-
override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName)
33-
override def lz4: Option[String] = Some(classOf[Lz4Codec].getCanonicalName)
34-
override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName)
35-
}
27+
private val shortCompressionCodecNames = Map(
28+
"bzip2" -> classOf[BZip2Codec].getName,
29+
"deflate" -> classOf[DeflateCodec].getName,
30+
"gzip" -> classOf[GzipCodec].getName,
31+
"lz4" -> classOf[Lz4Codec].getName,
32+
"snappy" -> classOf[SnappyCodec].getName)
3633

3734
/**
3835
* Return the full version of the given codec class.
3936
* If it is already a class name, just return it.
4037
*/
4138
def getCodecClassName(name: String): String = {
42-
val codecName = hadoopShortCodecNameMapper.get(name).getOrElse(name)
39+
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
4340
try {
4441
// Validate the codec name
4542
Utils.classForName(codecName)
4643
codecName
4744
} catch {
4845
case e: ClassNotFoundException =>
4946
throw new IllegalArgumentException(s"Codec [$codecName] " +
50-
s"is not available. Known codecs are " +
51-
s"${hadoopShortCodecNameMapper.getAsMap.keys.mkString(", ")}.")
47+
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
5248
}
5349
}
5450

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
5050
import org.apache.spark.sql.internal.SQLConf
5151
import org.apache.spark.sql.sources._
5252
import org.apache.spark.sql.types.{DataType, StructType}
53-
import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils}
53+
import org.apache.spark.util.{SerializableConfiguration, Utils}
5454

5555
private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
5656

@@ -284,8 +284,10 @@ private[sql] class ParquetRelation(
284284
conf.set(
285285
ParquetOutputFormat.COMPRESSION,
286286
ParquetRelation
287-
.parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec)
288-
.getOrElse(CompressionCodecName.UNCOMPRESSED.name()))
287+
.shortParquetCompressionCodecNames
288+
.getOrElse(
289+
sqlContext.conf.parquetCompressionCodec.toUpperCase,
290+
CompressionCodecName.UNCOMPRESSED).name())
289291

290292
new BucketedOutputWriterFactory {
291293
override def newInstance(
@@ -901,12 +903,11 @@ private[sql] object ParquetRelation extends Logging {
901903
}
902904
}
903905

904-
/** Maps the short versions of compression codec names to qualified compression names. */
905-
val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper {
906-
override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name())
907-
override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name())
908-
override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name())
909-
override def lzo: Option[String] = Some(CompressionCodecName.LZO.name())
910-
override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name())
911-
}
906+
// The parquet compression short names
907+
val shortParquetCompressionCodecNames = Map(
908+
"none" -> CompressionCodecName.UNCOMPRESSED,
909+
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
910+
"snappy" -> CompressionCodecName.SNAPPY,
911+
"gzip" -> CompressionCodecName.GZIP,
912+
"lzo" -> CompressionCodecName.LZO)
912913
}

0 commit comments

Comments
 (0)