@@ -29,6 +29,8 @@ import parquet.schema.{MessageType, MessageTypeParser}
2929import org .apache .spark .Logging
3030import org .apache .spark .sql .catalyst .expressions .{Attribute , Row }
3131import org .apache .spark .sql .catalyst .types ._
32+ import org .apache .spark .sql .execution .SparkSqlSerializer
33+ import com .google .common .io .BaseEncoding
3234
3335/**
3436 * A `parquet.io.api.RecordMaterializer` for Rows.
@@ -38,8 +40,8 @@ import org.apache.spark.sql.catalyst.types._
3840private [parquet] class RowRecordMaterializer (root : CatalystConverter )
3941 extends RecordMaterializer [Row ] {
4042
41- def this (parquetSchema : MessageType ) =
42- this (CatalystConverter .createRootConverter(parquetSchema))
43+ def this (parquetSchema : MessageType , attributes : Seq [ Attribute ] ) =
44+ this (CatalystConverter .createRootConverter(parquetSchema, attributes ))
4345
4446 override def getCurrentRecord : Row = root.getCurrentRecord
4547
@@ -57,50 +59,92 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
5759 fileSchema : MessageType ,
5860 readContext : ReadContext ): RecordMaterializer [Row ] = {
5961 log.debug(s " preparing for read with file schema $fileSchema" )
60- new RowRecordMaterializer (readContext.getRequestedSchema)
62+ // new RowRecordMaterializer(readContext.getRequestedSchema)
63+ val parquetSchema = readContext.getRequestedSchema
64+ var schema : Seq [Attribute ] =
65+ if (readContext.getReadSupportMetadata != null &&
66+ readContext.getReadSupportMetadata.get(RowReadSupport .SPARK_METADATA_KEY ) != null ) {
67+ ParquetTypesConverter .convertFromString(
68+ readContext.getReadSupportMetadata.get(RowReadSupport .SPARK_METADATA_KEY ))
69+ } else {
70+ // fall back to converting from Parquet schema
71+ ParquetTypesConverter .convertToAttributes(parquetSchema)
72+ }
73+ new RowRecordMaterializer (parquetSchema, schema)
6174 }
6275
6376 override def init (
6477 configuration : Configuration ,
6578 keyValueMetaData : java.util.Map [String , String ],
6679 fileSchema : MessageType ): ReadContext = {
67- val requested_schema_string =
80+ /* val requested_schema_string =
6881 configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString)
6982 val requested_schema =
7083 MessageTypeParser.parseMessageType(requested_schema_string)
7184 log.debug(s"read support initialized for requested schema $requested_schema")
7285 ParquetRelation.enableLogForwarding()
73- new ReadContext (requested_schema, keyValueMetaData)
86+ new ReadContext(requested_schema, keyValueMetaData) */
87+
88+ // GO ON HERE.. figure out why Avro distinguishes between requested read and read schema
89+ // try to figure out what when needs to be written to metadata
90+
91+ var parquetSchema : MessageType = fileSchema
92+ var metadata : java.util.Map [String , String ] = null
93+ val requestedAttributes = RowReadSupport .getRequestedSchema(configuration)
94+
95+ if (requestedAttributes != null ) {
96+ parquetSchema = ParquetTypesConverter .convertFromAttributes(requestedAttributes)
97+ }
98+
99+ val origAttributesStr : String = configuration.get(RowWriteSupport .SPARK_ROW_SCHEMA )
100+
101+ if (origAttributesStr != null ) {
102+ metadata = new java.util.HashMap [String , String ]()
103+ metadata.put(RowReadSupport .SPARK_METADATA_KEY , origAttributesStr)
104+ }
105+
106+ return new ReadSupport .ReadContext (parquetSchema, metadata)
74107 }
75108}
76109
77110private [parquet] object RowReadSupport {
78- val PARQUET_ROW_REQUESTED_SCHEMA = " org.apache.spark.sql.parquet.row.requested_schema"
111+ val SPARK_ROW_REQUESTED_SCHEMA = " org.apache.spark.sql.parquet.row.requested_schema"
112+ val SPARK_METADATA_KEY = " org.apache.spark.sql.parquet.row.metadata"
113+
114+ private def getRequestedSchema (configuration : Configuration ): Seq [Attribute ] = {
115+ val schemaString = configuration.get(RowReadSupport .SPARK_ROW_REQUESTED_SCHEMA )
116+ if (schemaString == null ) null else ParquetTypesConverter .convertFromString(schemaString)
117+ }
79118}
80119
81120/**
82121 * A `parquet.hadoop.api.WriteSupport` for Row ojects.
83122 */
84123private [parquet] class RowWriteSupport extends WriteSupport [Row ] with Logging {
85124
86-
87- def setSchema (schema : Seq [Attribute ], configuration : Configuration ) {
125+ /* def setSchema(schema: Seq[Attribute], configuration: Configuration) {
88126 configuration.set(
89127 RowWriteSupport.PARQUET_ROW_SCHEMA,
90128 StructType.fromAttributes(schema).toString)
91129 configuration.set(
92130 ParquetOutputFormat.WRITER_VERSION,
93131 ParquetProperties.WriterVersion.PARQUET_1_0.toString)
94- }
132+ } */
95133
134+ private [parquet] var schema : MessageType = null
96135 private [parquet] var writer : RecordConsumer = null
97136 private [parquet] var attributes : Seq [Attribute ] = null
98137
99138 override def init (configuration : Configuration ): WriteSupport .WriteContext = {
100- attributes = DataType (configuration.get(RowWriteSupport .PARQUET_ROW_SCHEMA )) match {
101- case s : StructType => s.toAttributes
102- case other => sys.error(s " Can convert $attributes to row " )
103- }
139+ // attributes = DataType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
140+ attributes = if (attributes == null ) {
141+ RowWriteSupport .getSchema(configuration) match {
142+ case s : StructType => s.toAttributes
143+ case other => sys.error(s " Can convert $attributes to row " )
144+ }
145+ } else attributes
146+ schema = if (schema == null ) ParquetTypesConverter .convertFromAttributes(attributes) else schema
147+ // ParquetTypesConverter.convertToAttributes(schema)
104148 log.debug(s " write support initialized for requested schema $attributes" )
105149 ParquetRelation .enableLogForwarding()
106150 new WriteSupport .WriteContext (
@@ -275,6 +319,22 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
275319}
276320
277321private [parquet] object RowWriteSupport {
278- val PARQUET_ROW_SCHEMA : String = " org.apache.spark.sql.parquet.row.schema"
322+ val SPARK_ROW_SCHEMA : String = " org.apache.spark.sql.parquet.row.attributes"
323+
324+ def getSchema (configuration : Configuration ): Seq [Attribute ] = {
325+ val schemaString = configuration.get(RowWriteSupport .SPARK_ROW_SCHEMA )
326+ if (schemaString == null ) {
327+ throw new RuntimeException (" Missing schema!" )
328+ }
329+ ParquetTypesConverter .convertFromString(schemaString)
330+ }
331+
332+ def setSchema (schema : Seq [Attribute ], configuration : Configuration ) {
333+ val encoded = ParquetTypesConverter .convertToString(schema)
334+ configuration.set(SPARK_ROW_SCHEMA , encoded)
335+ configuration.set(
336+ ParquetOutputFormat .WRITER_VERSION ,
337+ ParquetProperties .WriterVersion .PARQUET_1_0 .toString)
338+ }
279339}
280340
0 commit comments