Skip to content

Commit 95c1367

Browse files
Changes to ParquetRelation and its metadata
The Hadoop configuration is now passed to ParquetRelation (fixes SPARK-2112) and the path is no longer stored in metadata (fixes SPARK-2195)
1 parent 7eceb67 commit 95c1367

File tree

5 files changed

+18
-16
lines changed

5 files changed

+18
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
9494
* @group userf
9595
*/
9696
def parquetFile(path: String): SchemaRDD =
97-
new SchemaRDD(this, parquet.ParquetRelation(path))
97+
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))
9898

9999
/**
100100
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
9999
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
100100
*/
101101
def parquetFile(path: String): JavaSchemaRDD =
102-
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
102+
new JavaSchemaRDD(
103+
sqlContext,
104+
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))
103105

104106
/**
105107
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,20 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
4343
*
4444
* @param path The path to the Parquet file.
4545
*/
46-
private[sql] case class ParquetRelation(val path: String)
47-
extends LeafNode with MultiInstanceRelation {
46+
private[sql] case class ParquetRelation(
47+
val path: String,
48+
@transient val conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
4849
self: Product =>
4950

5051
/** Schema derived from ParquetFile */
5152
def parquetSchema: MessageType =
5253
ParquetTypesConverter
53-
.readMetaData(new Path(path))
54+
.readMetaData(new Path(path), conf)
5455
.getFileMetaData
5556
.getSchema
5657

5758
/** Attributes */
58-
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path))
59+
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
5960

6061
override def newInstance = ParquetRelation(path).asInstanceOf[this.type]
6162

@@ -130,7 +131,7 @@ private[sql] object ParquetRelation {
130131
}
131132
ParquetRelation.enableLogForwarding()
132133
ParquetTypesConverter.writeMetaData(attributes, path, conf)
133-
new ParquetRelation(path.toString) {
134+
new ParquetRelation(path.toString, Some(conf)) {
134135
override val output = attributes
135136
}
136137
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
322322
}
323323
}
324324
val extraMetadata = new java.util.HashMap[String, String]()
325-
extraMetadata.put("path", path.toString)
326325
extraMetadata.put(
327326
RowReadSupport.SPARK_METADATA_KEY,
328327
ParquetTypesConverter.convertToString(attributes))
@@ -347,16 +346,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
347346
* in the parent directory. If so, this is used. Else we read the actual footer at the given
348347
* location.
349348
* @param origPath The path at which we expect one (or more) Parquet files.
349+
* @param configuration The Hadoop configuration to use.
350350
* @return The `ParquetMetadata` containing among other things the schema.
351351
*/
352-
def readMetaData(origPath: Path): ParquetMetadata = {
352+
def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
353353
if (origPath == null) {
354354
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
355355
}
356356
val job = new Job()
357-
// TODO: since this is called from ParquetRelation (LogicalPlan) we don't have access
358-
// to SparkContext's hadoopConfig; in principle the default FileSystem may be different(?!)
359-
val conf = ContextUtil.getConfiguration(job)
357+
val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
360358
val fs: FileSystem = origPath.getFileSystem(conf)
361359
if (fs == null) {
362360
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
@@ -390,18 +388,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
390388
* may lead to an upcast of types (e.g., {byte, short} to int).
391389
*
392390
* @param origPath The path at which we expect one (or more) Parquet files.
391+
* @param conf The Hadoop configuration to use.
393392
* @return A list of attributes that make up the schema.
394393
*/
395-
def readSchemaFromFile(origPath: Path): Seq[Attribute] = {
394+
def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
396395
val keyValueMetadata: java.util.Map[String, String] =
397-
readMetaData(origPath)
396+
readMetaData(origPath, conf)
398397
.getFileMetaData
399398
.getKeyValueMetaData
400399
if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
401400
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
402401
} else {
403402
val attributes = convertToAttributes(
404-
readMetaData(origPath).getFileMetaData.getSchema)
403+
readMetaData(origPath, conf).getFileMetaData.getSchema)
405404
log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
406405
attributes
407406
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
207207
path,
208208
TestSQLContext.sparkContext.hadoopConfiguration)
209209
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
210-
val metaData = ParquetTypesConverter.readMetaData(path)
210+
val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
211211
assert(metaData != null)
212212
ParquetTestData
213213
.testData

0 commit comments

Comments
 (0)