@@ -10,9 +10,7 @@ import wvlet.airframe.control.Control.withResource
1010import wvlet .airframe .surface .Surface
1111import wvlet .log .LogSupport
1212
13- import scala .reflect .runtime .{universe => ru }
14-
15- object Parquet extends LogSupport {
13+ object Parquet extends ParquetCompat with LogSupport {
1614
1715 /**
1816 * Create a Parquet writer that accepts records represented in Map, Array, JSON, MsgPack, etc.
@@ -34,41 +32,42 @@ object Parquet extends LogSupport {
3432 builder.build()
3533 }
3634
37- def newWriter [A : ru.TypeTag ](
35+ def newObjectWriter [A ](
36+ objectSurface : Surface ,
3837 path : String ,
3938 // Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
4039 hadoopConf : Configuration = new Configuration (),
4140 config : ParquetWriterAdapter .Builder [A ] => ParquetWriterAdapter .Builder [A ] =
4241 identity[ParquetWriterAdapter .Builder [A ]](_)
4342 ): ParquetWriter [A ] = {
44- val s = Surface .of[A ]
45- val b = ParquetWriterAdapter .builder[A ](s, path, hadoopConf)
43+ val b = ParquetWriterAdapter .builder[A ](objectSurface, path, hadoopConf)
4644 val builder = config(b)
4745 builder.build()
4846 }
4947
50- def newReader [A : ru.TypeTag ](
48+ def newObjectReader [A ](
49+ objectSurface : Surface ,
5150 path : String ,
5251 // Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
5352 hadoopConf : Configuration = new Configuration (),
5453 config : ParquetReader .Builder [A ] => ParquetReader .Builder [A ] = identity[ParquetReader .Builder [A ]](_)
5554 ): ParquetReader [A ] = {
56- val s = Surface .of[A ]
57- val b : ParquetReader .Builder [A ] = ParquetReaderAdapter .builder[A ](s, path, hadoopConf)
55+ val b : ParquetReader .Builder [A ] = ParquetReaderAdapter .builder[A ](objectSurface, path, hadoopConf)
5856 config(b).build()
5957 }
6058
61- def query [A : ru.TypeTag ](
59+ def queryObject [A ](
60+ objectSurface : Surface ,
6261 path : String ,
6362 sql : String ,
6463 hadoopConf : Configuration = new Configuration (),
6564 config : ParquetReader .Builder [A ] => ParquetReader .Builder [A ] = identity[ParquetReader .Builder [A ]](_)
6665 ): ParquetReader [A ] = {
67- val s = Surface .of[A ]
6866 // Read Parquet schema for resolving column types
69- val schema = readSchema(path)
70- val plan = ParquetQueryPlanner .parse(sql, schema)
71- val b : ParquetReader .Builder [A ] = ParquetReaderAdapter .builder[A ](s, path, conf = hadoopConf, plan = Some (plan))
67+ val schema = readSchema(path)
68+ val plan = ParquetQueryPlanner .parse(sql, schema)
69+ val b : ParquetReader .Builder [A ] =
70+ ParquetReaderAdapter .builder[A ](objectSurface, path, conf = hadoopConf, plan = Some (plan))
7271
7372 val newConf = plan.predicate match {
7473 case Some (pred) =>
0 commit comments