Skip to content

Commit 90931d9

Browse files
authored
airframe-sql/parquet: Scala 3 support (#2208)
1 parent 306581c commit 90931d9

8 files changed

Lines changed: 106 additions & 20 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package wvlet.airframe.parquet
15+
16+
import org.apache.hadoop.conf.Configuration
17+
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
18+
import wvlet.airframe.surface.Surface
19+
20+
import scala.reflect.runtime.{universe => ru}
21+
22+
trait ParquetCompat {
23+
def newWriter[A: ru.TypeTag](
24+
path: String,
25+
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
26+
hadoopConf: Configuration = new Configuration(),
27+
config: ParquetWriterAdapter.Builder[A] => ParquetWriterAdapter.Builder[A] =
28+
identity[ParquetWriterAdapter.Builder[A]](_)
29+
): ParquetWriter[A] = {
30+
Parquet.newObjectWriter[A](Surface.of[A], path, hadoopConf, config)
31+
}
32+
33+
def newReader[A: ru.TypeTag](
34+
path: String,
35+
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
36+
hadoopConf: Configuration = new Configuration(),
37+
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
38+
): ParquetReader[A] = {
39+
Parquet.newObjectReader[A](Surface.of[A], path, hadoopConf, config)
40+
}
41+
42+
def query[A: ru.TypeTag](
43+
path: String,
44+
sql: String,
45+
hadoopConf: Configuration = new Configuration(),
46+
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
47+
): ParquetReader[A] = {
48+
Parquet.queryObject[A](Surface.of[A], path, sql, hadoopConf, config)
49+
}
50+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package wvlet.airframe.parquet
2+
3+
import org.apache.hadoop.conf.Configuration
4+
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
5+
import wvlet.airframe.surface.Surface
6+
7+
trait ParquetCompat {
8+
9+
inline def newWriter[A](
10+
path: String,
11+
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
12+
hadoopConf: Configuration = new Configuration(),
13+
config: ParquetWriterAdapter.Builder[A] => ParquetWriterAdapter.Builder[A] =
14+
identity[ParquetWriterAdapter.Builder[A]](_)
15+
): ParquetWriter[A] = {
16+
Parquet.newObjectWriter[A](Surface.of[A], path, hadoopConf, config)
17+
}
18+
19+
inline def newReader[A](
20+
path: String,
21+
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
22+
hadoopConf: Configuration = new Configuration(),
23+
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
24+
): ParquetReader[A] = {
25+
Parquet.newObjectReader[A](Surface.of[A], path, hadoopConf, config)
26+
}
27+
28+
inline def query[A](
29+
path: String,
30+
sql: String,
31+
hadoopConf: Configuration = new Configuration(),
32+
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
33+
): ParquetReader[A] = {
34+
Parquet.queryObject[A](Surface.of[A], path, sql, hadoopConf, config)
35+
}
36+
37+
}

airframe-parquet/src/main/scala/wvlet/airframe/parquet/Parquet.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import wvlet.airframe.control.Control.withResource
1010
import wvlet.airframe.surface.Surface
1111
import wvlet.log.LogSupport
1212

13-
import scala.reflect.runtime.{universe => ru}
14-
15-
object Parquet extends LogSupport {
13+
object Parquet extends ParquetCompat with LogSupport {
1614

1715
/**
1816
* Create a Parquet writer that accepts records represented in Map, Array, JSON, MsgPack, etc.
@@ -34,41 +32,42 @@ object Parquet extends LogSupport {
3432
builder.build()
3533
}
3634

37-
def newWriter[A: ru.TypeTag](
35+
def newObjectWriter[A](
36+
objectSurface: Surface,
3837
path: String,
3938
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
4039
hadoopConf: Configuration = new Configuration(),
4140
config: ParquetWriterAdapter.Builder[A] => ParquetWriterAdapter.Builder[A] =
4241
identity[ParquetWriterAdapter.Builder[A]](_)
4342
): ParquetWriter[A] = {
44-
val s = Surface.of[A]
45-
val b = ParquetWriterAdapter.builder[A](s, path, hadoopConf)
43+
val b = ParquetWriterAdapter.builder[A](objectSurface, path, hadoopConf)
4644
val builder = config(b)
4745
builder.build()
4846
}
4947

50-
def newReader[A: ru.TypeTag](
48+
def newObjectReader[A](
49+
objectSurface: Surface,
5150
path: String,
5251
// Hadoop filesystem specific configuration, e.g., fs.s3a.access.key
5352
hadoopConf: Configuration = new Configuration(),
5453
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
5554
): ParquetReader[A] = {
56-
val s = Surface.of[A]
57-
val b: ParquetReader.Builder[A] = ParquetReaderAdapter.builder[A](s, path, hadoopConf)
55+
val b: ParquetReader.Builder[A] = ParquetReaderAdapter.builder[A](objectSurface, path, hadoopConf)
5856
config(b).build()
5957
}
6058

61-
def query[A: ru.TypeTag](
59+
def queryObject[A](
60+
objectSurface: Surface,
6261
path: String,
6362
sql: String,
6463
hadoopConf: Configuration = new Configuration(),
6564
config: ParquetReader.Builder[A] => ParquetReader.Builder[A] = identity[ParquetReader.Builder[A]](_)
6665
): ParquetReader[A] = {
67-
val s = Surface.of[A]
6866
// Read Parquet schema for resolving column types
69-
val schema = readSchema(path)
70-
val plan = ParquetQueryPlanner.parse(sql, schema)
71-
val b: ParquetReader.Builder[A] = ParquetReaderAdapter.builder[A](s, path, conf = hadoopConf, plan = Some(plan))
67+
val schema = readSchema(path)
68+
val plan = ParquetQueryPlanner.parse(sql, schema)
69+
val b: ParquetReader.Builder[A] =
70+
ParquetReaderAdapter.builder[A](objectSurface, path, conf = hadoopConf, plan = Some(plan))
7271

7372
val newConf = plan.predicate match {
7473
case Some(pred) =>

airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetQueryTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ object ParquetQueryTest extends AirSpec {
5252

5353
case class RecordProjection(id: Int, b: Boolean)
5454

55-
test("SQL over Parquet") { file: Resource[File] =>
55+
test("SQL over Parquet") { (file: Resource[File]) =>
5656
val path = file.get.getPath
5757
test("read all columns") {
5858
val reader = Parquet.query[Record](path, "select * from _")

airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ object ParquetTest extends AirSpec {
4242
test(
4343
"write Parquet",
4444
design = newDesign.bind[Resource[File]].toInstance(Resource.newTempFile("target/tmp", ".parquet"))
45-
) { parquetFile: Resource[File] =>
45+
) { (parquetFile: Resource[File]) =>
4646
val file = parquetFile.get
4747
debug(s"Writing to ${file}")
4848
withResource(

airframe-sql/src/main/scala/wvlet/airframe/sql/model/Expression.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ sealed trait Expression extends TreeNode[Expression] with Product {
4444

4545
// Apply the rule to itself
4646
rule
47-
.applyOrElse(newExpr, { x: Expression => x }).asInstanceOf[this.type]
47+
.applyOrElse(newExpr, { (x: Expression) => x }).asInstanceOf[this.type]
4848
}
4949

5050
def collectSubExpressions: List[Expression] = {

airframe-sql/src/main/scala/wvlet/airframe/sql/parser/SQLInterpreter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,10 +668,10 @@ class SQLInterpreter extends SqlBaseBaseVisitor[Any] with LogSupport {
668668

669669
override def visitFunctionCall(ctx: FunctionCallContext): FunctionCall = {
670670
val name = ctx.qualifiedName().getText
671-
val filter: Option[Expression] = Option(ctx.filter()).map { f: FilterContext =>
671+
val filter: Option[Expression] = Option(ctx.filter()).map { (f: FilterContext) =>
672672
expression(f.booleanExpression())
673673
}
674-
val over: Option[Window] = Option(ctx.over()).map { o: OverContext =>
674+
val over: Option[Window] = Option(ctx.over()).map { (o: OverContext) =>
675675
visitOver(o)
676676
}
677677

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ lazy val projectDotty =
276276
rxJVM,
277277
// rx-html uses Scala Macros
278278
rxHtmlJVM,
279-
// sql,
279+
sql,
280280
ulidJVM
281281
)
282282

0 commit comments

Comments
 (0)