Skip to content

Commit 14c3fd8

Browse files
Attempting to fix Spark-Parquet schema conversion
1 parent 3e1456c commit 14c3fd8

File tree

4 files changed

+97
-19
lines changed

4 files changed

+97
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ private[sql] object CatalystConverter {
106106
}
107107
}
108108

109-
protected[parquet] def createRootConverter(parquetSchema: MessageType): CatalystConverter = {
110-
val attributes = ParquetTypesConverter.convertToAttributes(parquetSchema)
109+
protected[parquet] def createRootConverter(parquetSchema: MessageType, attributes: Seq[Attribute]): CatalystConverter = {
110+
//val attributes = ParquetTypesConverter.convertToAttributes(parquetSchema)
111111
// For non-nested types we use the optimized Row converter
112112
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
113113
new CatalystPrimitiveRowConverter(attributes)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ case class ParquetTableScan(
6666
}
6767

6868
// Store Parquet schema in `Configuration`
69+
// TODO: should this here be just the projected fields?
6970
conf.set(
70-
RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
71-
ParquetTypesConverter.convertFromAttributes(output).toString)
71+
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
72+
ParquetTypesConverter.convertToString(output))
73+
//conf.set(
74+
// RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
75+
// ParquetTypesConverter.convertFromAttributes(output).toString)
7276

7377
// Store record filtering predicate in `Configuration`
7478
// Note 1: the input format ignores all predicates that cannot be expressed
@@ -179,7 +183,8 @@ case class InsertIntoParquetTable(
179183

180184
// TODO: move that to function in object
181185
val conf = ContextUtil.getConfiguration(job)
182-
conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, StructType.fromAttributes(relation.output).toString)
186+
//conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, StructType.fromAttributes(relation.output).toString)
187+
RowWriteSupport.setSchema(relation.output, conf)
183188

184189
val fspath = new Path(relation.path)
185190
val fs = fspath.getFileSystem(conf)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import parquet.schema.{MessageType, MessageTypeParser}
2929
import org.apache.spark.Logging
3030
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
3131
import org.apache.spark.sql.catalyst.types._
32+
import org.apache.spark.sql.execution.SparkSqlSerializer
33+
import com.google.common.io.BaseEncoding
3234

3335
/**
3436
* A `parquet.io.api.RecordMaterializer` for Rows.
@@ -38,8 +40,8 @@ import org.apache.spark.sql.catalyst.types._
3840
private[parquet] class RowRecordMaterializer(root: CatalystConverter)
3941
extends RecordMaterializer[Row] {
4042

41-
def this(parquetSchema: MessageType) =
42-
this(CatalystConverter.createRootConverter(parquetSchema))
43+
def this(parquetSchema: MessageType, attributes: Seq[Attribute]) =
44+
this(CatalystConverter.createRootConverter(parquetSchema, attributes))
4345

4446
override def getCurrentRecord: Row = root.getCurrentRecord
4547

@@ -57,50 +59,92 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
5759
fileSchema: MessageType,
5860
readContext: ReadContext): RecordMaterializer[Row] = {
5961
log.debug(s"preparing for read with file schema $fileSchema")
60-
new RowRecordMaterializer(readContext.getRequestedSchema)
62+
//new RowRecordMaterializer(readContext.getRequestedSchema)
63+
val parquetSchema = readContext.getRequestedSchema
64+
var schema: Seq[Attribute] =
65+
if (readContext.getReadSupportMetadata != null &&
66+
readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
67+
ParquetTypesConverter.convertFromString(
68+
readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
69+
} else {
70+
// fall back to converting from Parquet schema
71+
ParquetTypesConverter.convertToAttributes(parquetSchema)
72+
}
73+
new RowRecordMaterializer(parquetSchema, schema)
6174
}
6275

6376
override def init(
6477
configuration: Configuration,
6578
keyValueMetaData: java.util.Map[String, String],
6679
fileSchema: MessageType): ReadContext = {
67-
val requested_schema_string =
80+
/*val requested_schema_string =
6881
configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString)
6982
val requested_schema =
7083
MessageTypeParser.parseMessageType(requested_schema_string)
7184
log.debug(s"read support initialized for requested schema $requested_schema")
7285
ParquetRelation.enableLogForwarding()
73-
new ReadContext(requested_schema, keyValueMetaData)
86+
new ReadContext(requested_schema, keyValueMetaData) */
87+
88+
// GO ON HERE.. figure out why Avro distinguishes between requested read and read schema
89+
// try to figure out what when needs to be written to metadata
90+
91+
var parquetSchema: MessageType = fileSchema
92+
var metadata: java.util.Map[String, String] = null
93+
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
94+
95+
if (requestedAttributes != null) {
96+
parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
97+
}
98+
99+
val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
100+
101+
if (origAttributesStr != null) {
102+
metadata = new java.util.HashMap[String, String]()
103+
metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
104+
}
105+
106+
return new ReadSupport.ReadContext(parquetSchema, metadata)
74107
}
75108
}
76109

77110
private[parquet] object RowReadSupport {
78-
val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
111+
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
112+
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
113+
114+
private def getRequestedSchema(configuration: Configuration): Seq[Attribute] = {
115+
val schemaString = configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
116+
if (schemaString == null) null else ParquetTypesConverter.convertFromString(schemaString)
117+
}
79118
}
80119

81120
/**
82121
* A `parquet.hadoop.api.WriteSupport` for Row ojects.
83122
*/
84123
private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
85124

86-
87-
def setSchema(schema: Seq[Attribute], configuration: Configuration) {
125+
/*def setSchema(schema: Seq[Attribute], configuration: Configuration) {
88126
configuration.set(
89127
RowWriteSupport.PARQUET_ROW_SCHEMA,
90128
StructType.fromAttributes(schema).toString)
91129
configuration.set(
92130
ParquetOutputFormat.WRITER_VERSION,
93131
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
94-
}
132+
} */
95133

134+
private[parquet] var schema: MessageType = null
96135
private[parquet] var writer: RecordConsumer = null
97136
private[parquet] var attributes: Seq[Attribute] = null
98137

99138
override def init(configuration: Configuration): WriteSupport.WriteContext = {
100-
attributes = DataType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA)) match {
101-
case s: StructType => s.toAttributes
102-
case other => sys.error(s"Can convert $attributes to row")
103-
}
139+
//attributes = DataType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
140+
attributes = if (attributes == null) {
141+
RowWriteSupport.getSchema(configuration) match {
142+
case s: StructType => s.toAttributes
143+
case other => sys.error(s"Can convert $attributes to row")
144+
}
145+
} else attributes
146+
schema = if (schema == null) ParquetTypesConverter.convertFromAttributes(attributes) else schema
147+
// ParquetTypesConverter.convertToAttributes(schema)
104148
log.debug(s"write support initialized for requested schema $attributes")
105149
ParquetRelation.enableLogForwarding()
106150
new WriteSupport.WriteContext(
@@ -275,6 +319,22 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
275319
}
276320

277321
private[parquet] object RowWriteSupport {
278-
val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema"
322+
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
323+
324+
def getSchema(configuration: Configuration): Seq[Attribute] = {
325+
val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
326+
if (schemaString == null) {
327+
throw new RuntimeException("Missing schema!")
328+
}
329+
ParquetTypesConverter.convertFromString(schemaString)
330+
}
331+
332+
def setSchema(schema: Seq[Attribute], configuration: Configuration) {
333+
val encoded = ParquetTypesConverter.convertToString(schema)
334+
configuration.set(SPARK_ROW_SCHEMA, encoded)
335+
configuration.set(
336+
ParquetOutputFormat.WRITER_VERSION,
337+
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
338+
}
279339
}
280340

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import parquet.schema.Type.Repetition
3333

3434
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
3535
import org.apache.spark.sql.catalyst.types._
36+
import com.google.common.io.BaseEncoding
37+
import org.apache.spark.sql.execution.SparkSqlSerializer
3638

3739
// Implicits
3840
import scala.collection.JavaConversions._
@@ -289,6 +291,16 @@ private[parquet] object ParquetTypesConverter {
289291
new MessageType("root", fields)
290292
}
291293

294+
def convertFromString(string: String): Seq[Attribute] = {
295+
val decoded: Array[Byte] = BaseEncoding.base64().decode(string)
296+
SparkSqlSerializer.deserialize(decoded)
297+
}
298+
299+
def convertToString(schema: Seq[Attribute]): String = {
300+
val serialized: Array[Byte] = SparkSqlSerializer.serialize(schema)
301+
BaseEncoding.base64().encode(serialized)
302+
}
303+
292304
def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration) {
293305
if (origPath == null) {
294306
throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
@@ -313,6 +325,7 @@ private[parquet] object ParquetTypesConverter {
313325
}
314326
val extraMetadata = new java.util.HashMap[String, String]()
315327
extraMetadata.put("path", path.toString)
328+
extraMetadata.put(RowReadSupport.SPARK_METADATA_KEY, ParquetTypesConverter.convertToString(attributes))
316329
// TODO: add extra data, e.g., table name, date, etc.?
317330

318331
val parquetSchema: MessageType =

0 commit comments

Comments
 (0)