Skip to content

Commit 522e852

Browse files
committed
Add Spark connector option for passing Pinot query options
1 parent d9c4315 commit 522e852

File tree

15 files changed

+140
-26
lines changed

15 files changed

+140
-26
lines changed

pinot-connectors/pinot-spark-2-connector/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ Detailed read model documentation is here; [Spark-Pinot Connector Read Model](do
2828
## Features
2929
- Query realtime, offline or hybrid tables
3030
- Distributed, parallel scan
31+
- Streaming reads using gRPC (optional)
3132
- SQL support instead of PQL
3233
- Column and filter push down to optimize performance
3334
- Overlap between realtime and offline segments is queried exactly once for hybrid tables
3435
- Schema discovery
3536
- Dynamic inference
3637
- Static analysis of case class
38+
- Supports query options
3739

3840
## Quick Start
3941
```scala

pinot-connectors/pinot-spark-2-connector/documentation/read_model.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,4 @@ val df = spark.read
138138
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
139139
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
140140
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
141+
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |

pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,16 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
7373
}
7474

7575
val whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters)
76-
val generatedSQLs = ScanQueryGenerator.generate(
76+
val scanQuery = ScanQueryGenerator.generate(
7777
readParameters.tableName,
7878
readParameters.tableType,
7979
timeBoundaryInfo,
8080
schema.fieldNames,
81-
whereCondition
81+
whereCondition,
82+
readParameters.queryOptions
8283
)
8384

84-
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, generatedSQLs)
85+
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, scanQuery)
8586

8687
val instanceInfo : Map[String, InstanceInfo] = Map()
8788
val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
@@ -92,7 +93,7 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
9293
}
9394

9495
PinotSplitter
95-
.generatePinotSplits(generatedSQLs, routingTable, instanceInfoReader, readParameters)
96+
.generatePinotSplits(scanQuery, routingTable, instanceInfoReader, readParameters)
9697
.zipWithIndex
9798
.map {
9899
case (pinotSplit, partitionId) =>

pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/TypeConverter.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,23 @@ private[datasource] object TypeConverter {
6666
dataTable: DataTable,
6767
sparkSchema: StructType): Seq[InternalRow] = {
6868
val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
69+
val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
70+
dataTable.getNullRowIds(col)
71+
}
6972
(0 until dataTable.getNumberOfRows).map { rowIndex =>
7073
// spark schema is used to ensure columns order
7174
val columns = sparkSchema.fields.map { field =>
7275
val colIndex = dataTableColumnNames.indexOf(field.name)
7376
if (colIndex < 0) {
7477
throw PinotException(s"'${field.name}' not found in Pinot server response")
7578
} else {
76-
// pinot column data type can be used directly,
77-
// because all of them is supported in spark schema
78-
val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
79-
readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
79+
if (nullRowIdsByColumn(colIndex) != null
80+
&& nullRowIdsByColumn(colIndex).contains(rowIndex)) {
81+
null
82+
} else {
83+
val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
84+
readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
85+
}
8086
}
8187
}
8288
InternalRow.fromSeq(columns)

pinot-connectors/pinot-spark-2-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/TypeConverterTest.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.connector.spark.datasource
2020

21+
import org.apache.pinot.common.datatable.DataTableFactory
2122
import org.apache.pinot.common.utils.DataSchema
2223
import org.apache.pinot.common.utils.DataSchema.ColumnDataType
2324
import org.apache.pinot.connector.spark.common.PinotException
@@ -27,6 +28,7 @@ import org.apache.pinot.spi.utils.ByteArray
2728
import org.apache.spark.sql.catalyst.util.ArrayData
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.unsafe.types.UTF8String
31+
import org.roaringbitmap.RoaringBitmap
3032

3133
import scala.io.Source
3234

@@ -160,6 +162,35 @@ class TypeConverterTest extends BaseTest {
160162
exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
161163
}
162164

165+
test("Converter should identify and correctly return null columns") {
166+
val columnNames = Array("strCol", "intCol")
167+
val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
168+
val dataSchema = new DataSchema(columnNames, columnTypes)
169+
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4)
170+
171+
val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
172+
dataTableBuilder.startRow()
173+
dataTableBuilder.setColumn(0, "null")
174+
dataTableBuilder.setColumn(1, 5)
175+
dataTableBuilder.finishRow()
176+
177+
val nullRowIds = new RoaringBitmap()
178+
nullRowIds.add(0)
179+
dataTableBuilder.setNullRowIds(nullRowIds)
180+
dataTableBuilder.setNullRowIds(null)
181+
val dataTable = dataTableBuilder.build()
182+
183+
val schema = StructType(
184+
Seq(
185+
StructField("strCol", StringType, true),
186+
StructField("intCol", IntegerType, true)
187+
)
188+
)
189+
190+
val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
191+
result.get(0, StringType) shouldEqual null
192+
}
193+
163194
test("Pinot schema should be converted to spark schema") {
164195
val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
165196
val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))

pinot-connectors/pinot-spark-3-connector/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Detailed read model documentation is here; [Spark-Pinot Connector Read Model](do
3535
- Schema discovery
3636
- Dynamic inference
3737
- Static analysis of case class
38+
- Supports query options
3839

3940
## Quick Start
4041
```scala

pinot-connectors/pinot-spark-3-connector/documentation/read_model.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,4 @@ val df = spark.read
138138
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
139139
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
140140
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
141+
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class PinotScanBuilder(readParameters: PinotDataSourceReadOptions)
5454
readParameters.tableType,
5555
timeBoundaryInfo,
5656
currentSchema.fieldNames,
57-
whereCondition
57+
whereCondition,
58+
readParameters.queryOptions
5859
)
5960

6061
new PinotScan(scanQuery, currentSchema, readParameters)

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,23 @@ private[pinot] object TypeConverter {
6666
dataTable: DataTable,
6767
sparkSchema: StructType): Seq[InternalRow] = {
6868
val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
69+
val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
70+
dataTable.getNullRowIds(col)
71+
}
6972
(0 until dataTable.getNumberOfRows).map { rowIndex =>
7073
// spark schema is used to ensure columns order
7174
val columns = sparkSchema.fields.map { field =>
7275
val colIndex = dataTableColumnNames.indexOf(field.name)
7376
if (colIndex < 0) {
7477
throw PinotException(s"'${field.name}' not found in Pinot server response")
7578
} else {
76-
// pinot column data type can be used directly,
77-
// because all of them is supported in spark schema
78-
val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
79-
readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
79+
if (nullRowIdsByColumn(colIndex) != null
80+
&& nullRowIdsByColumn(colIndex).contains(rowIndex)) {
81+
null
82+
} else {
83+
val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
84+
readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
85+
}
8086
}
8187
}
8288
InternalRow.fromSeq(columns)

pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.connector.spark.v3.datasource
2020

21+
import org.apache.pinot.common.datatable.DataTableFactory
2122
import org.apache.pinot.common.utils.DataSchema
2223
import org.apache.pinot.common.utils.DataSchema.ColumnDataType
2324
import org.apache.pinot.connector.spark.common.PinotException
@@ -27,6 +28,7 @@ import org.apache.pinot.spi.utils.ByteArray
2728
import org.apache.spark.sql.catalyst.util.ArrayData
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.unsafe.types.UTF8String
31+
import org.roaringbitmap.RoaringBitmap
3032

3133
import scala.io.Source
3234

@@ -160,6 +162,37 @@ class TypeConverterTest extends BaseTest {
160162
exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
161163
}
162164

165+
test("Converter should identify and correctly return null rows") {
166+
val columnNames = Array("strCol", "intCol")
167+
val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
168+
val dataSchema = new DataSchema(columnNames, columnTypes)
169+
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4)
170+
171+
val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
172+
dataTableBuilder.startRow()
173+
dataTableBuilder.setColumn(0, "null")
174+
dataTableBuilder.setColumn(1, 5)
175+
dataTableBuilder.finishRow()
176+
177+
val nullRowIds = new RoaringBitmap()
178+
nullRowIds.add(0)
179+
dataTableBuilder.setNullRowIds(nullRowIds)
180+
dataTableBuilder.setNullRowIds(null)
181+
182+
183+
val dataTable = dataTableBuilder.build()
184+
185+
val schema = StructType(
186+
Seq(
187+
StructField("strCol", StringType, true),
188+
StructField("intCol", IntegerType, true)
189+
)
190+
)
191+
192+
val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
193+
result.get(0, StringType) shouldEqual null
194+
}
195+
163196
test("Pinot schema should be converted to spark schema") {
164197
val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
165198
val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))

0 commit comments

Comments
 (0)