[SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files#37228
[SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files#37228ala wants to merge 13 commits intoapache:masterfrom
Conversation
| // a subset of rows in the row group is going to be read. Note that there is a name | ||
| // collision here: these row indexes (unlike ones this class is generating) are counted | ||
| // starting from 0 in each of the row groups. | ||
| rowIndexIterator = pages.getRowIndexes.get.asScala.map(idx => idx + startingRowIdx) |
There was a problem hiding this comment.
wondering is PageReadStore.getRowIndexes() would return continuous row indexes or not?
If the row indexes are continuous, then we don't need to store a long value per row. We can have something like RangeColumnVector(startIdx, length) to save cost to compute and store the row index vector (similar idea to ConstantColumnVector). cc @cloud-fan.
There was a problem hiding this comment.
I believe this is not guaranteed. These row indexes are supposed to be produced as a result of page-level min/max filtering. So it there are 3 pages in a column chunk (intersection of row group and column) and 1st and 3rd match the predicate, but 2nd does not, we should be getting a non-continuous range of indexes.
There was a problem hiding this comment.
Would it be possible to add a test for this scenario?
Yaohua628
left a comment
There was a problem hiding this comment.
Thanks for working on it!
Not familiar with the Parquet part, but left some questions and comments around _metadata, thanks!
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Outdated
Show resolved
Hide resolved
| val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] = | ||
| metadataColumns.map(_.name).flatMap { | ||
| case FileFormat.ROW_INDEX => | ||
| Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)()) |
There was a problem hiding this comment.
a qq: what if the user schema contains a column _tmp_metadata_row_index? would it be overridden?
maybe you can create an AttributeReference with a __metadata_col in its metadata field, see the usage of __metadata_col: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala#L188 and pattern match on it afterward (just be safe)?
sadikovi
left a comment
There was a problem hiding this comment.
Thanks for working on this PR. I did the first pass on the changes and left a few comments.
My questions are:
- Would you be able to provide performance numbers to see how row index affects reads and writes before and after the change?
- I think we may need to have a way to opt out depending on the first question. Would you be able to take a look into that too?
I may need to review it again later as I might have missed context on file metadata and row index.
cc @sunchao
| this.isPrimitive = column.isPrimitive(); | ||
|
|
||
| if (missingColumns.contains(column)) { | ||
| if (ParquetRowIndexUtil.isRowIndexColumn(column)) { |
There was a problem hiding this comment.
It's a string comparison (so O(column name length), but in practice cheaper) that happens at most once per column per scan task. I believe it's in the same ballpark as all the other initialization happening at the beginning of the scan.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
Outdated
Show resolved
Hide resolved
|
@sadikovi To answer the questions about performance:
I don't quite see how this change would impact the write performance. There are no changes to the write path, only to the read path.
I am not really sure how the opt-out would look like. The user can always just not read the |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Would you be able to provide performance numbers for reads with and without column index or quote the difference with and without row index enabled that you have observed?
By the way, row group metadata row count may not be accurate in some instances. We have seen files where the row count was incorrect. How will the newly added code behave in this scenario?
Can you update the PR title and description to state that this is for Parquet only? It was not particularly clear from the description.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
Outdated
Show resolved
Hide resolved
|
|
||
|
|
||
| object RowIndexUtil { | ||
| def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = { |
There was a problem hiding this comment.
Interesting name, so it is a column index for the RowIndex column, isn't it?
There was a problem hiding this comment.
Yes. If you have a different suggestion, we can change it.
| test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") { | ||
| withReadDataFrame("parquet", extraCol = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df => | ||
| // Column values are read from the file, rather than populated with generated row indexes. | ||
| // FIXME |
There was a problem hiding this comment.
@ala would you be able to create a follow-up ticket to address this so we don't forget? 🙂
@sadikovi That is really bad. I think such files should be considered corrupt data, and it is expected that features might not work correctly with them. Can you maybe point me to such files, so that I can test the exact behavior? |
sadikovi
left a comment
There was a problem hiding this comment.
Looks good, thanks for making the changes! 👍
Regarding metadata count, that would be considered a bug in Parquet anyway. I may need some time to find those files. We can always address it in a follow-up.
|
@sadikovi The cost of reading the row_index column is in the same ballpark as the other metadata columns: And these numbers are in the same ballpark as for vanilla |
|
Can we have a committer merge this? The failing test looks unrelated to the PR. |
| } | ||
| // If needed, compute row indexes within a file. | ||
| if (rowIndexGenerator != null) { | ||
| rowIndexGenerator.populateRowIndex(columnVectors, num); |
There was a problem hiding this comment.
Sorry I'm not very familiar with parquet read codebase, does this happen after row group skipping or not?
There was a problem hiding this comment.
Was looking through the code and it seems like it happens after row group skipping. Also, the tests in ParquetRowIndexSuite check that the rowIndex column doesn't have any of the values from the skipped row groups.
|
|
||
| // A name for a temporary column that holds row indexes computed by the file format reader | ||
| // until they can be placed in the _metadata struct. | ||
| val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX" |
There was a problem hiding this comment.
we add this name because row_index is more likely to conflict with data columns?
| val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] = | ||
| metadataColumns.map(_.name).flatMap { | ||
| case FileFormat.ROW_INDEX => | ||
| if ((readDataColumns ++ partitionColumns).map(_.name) |
There was a problem hiding this comment.
shall we consider case sensitivity?
| case _ => None | ||
| } | ||
|
|
||
| val outputSchema = (readDataColumns ++ fileFormatReaderGeneratedMetadataColumns).toStructType |
There was a problem hiding this comment.
shall we rename it to outputDataSchema? otherwise it's a bit confusing that this is inconsistent with outputAttributes
| case FileFormat.ROW_INDEX => | ||
| fileFormatReaderGeneratedMetadataColumns | ||
| .filter(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) | ||
| .head.withName(FileFormat.ROW_INDEX) |
There was a problem hiding this comment.
nit: filter(...).head -> find(...).get
| object RowIndexUtil { | ||
| def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = { | ||
| sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) => | ||
| field.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME |
There was a problem hiding this comment.
isn't it always the last column?
There was a problem hiding this comment.
This is the last "data" column, in the outputAttributes, but there are also the partition and metadata columns.
Not sure about the schema, if the order in metadata columns is kept even if the users add additional metadata columns.
|
|
||
| override def getCurrentValue: InternalRow = { | ||
| val row = parent.getCurrentValue | ||
| row.setLong(rowIndexColumnIdx, parent.getCurrentRowIndex) |
There was a problem hiding this comment.
ah, so parquet-mr already provides APIs to get row index?
There was a problem hiding this comment.
Yes, this was recently introduced into parquet-mr.
|
It seems the python linter is broken in GA, cc @HyukjinKwon |
|
thanks, merging to master! |
What changes were proposed in this pull request?
This change adds
row_indexcolumn to_metadatastruct. This column allows us to uniquely identify rows read from a given file with an index number. The n-th row in a given file with be assignedn-1row index in every scan of the file, irrespective of file splitting and data skipping in use.The new column requires file format specific support. This change introduces Parquet support, and other formats can follow later.
Why are the changes needed?
Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple uniquely identifies a row in a table. This information can be used to mark rows e.g. can be used to create an indexer.
Does this PR introduce any user-facing change?
Yes. With this change the customers will be able to access
_metadata.row_indexmetadata column when reading Parquet data. The schema of_matadatacolumn remains unchanged for file formats without row index support.How was this patch tested?
FileMetadataStructSuite.scalato make sure the feature works correctly in different scenarios (supported/unsupported file format, batch/record reads, on/off heap memory...).ParquetRowIndexSuite.scalato make sure the row indexes are generated correctly for Parquet file in conjunction with any combination of data skipping features.FileMetadataStructRowIndexSuiteto account for new column in_metadatastruct.