Skip to content

[SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files#37228

Closed
ala wants to merge 13 commits intoapache:masterfrom
ala:row-idx-v4
Closed

[SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files#37228
ala wants to merge 13 commits intoapache:masterfrom
ala:row-idx-v4

Conversation

@ala
Copy link
Copy Markdown
Contributor

@ala ala commented Jul 19, 2022

What changes were proposed in this pull request?

This change adds row_index column to _metadata struct. This column allows us to uniquely identify rows read from a given file with an index number. The n-th row in a given file with be assigned n-1 row index in every scan of the file, irrespective of file splitting and data skipping in use.

The new column requires file format specific support. This change introduces Parquet support, and other formats can follow later.

Why are the changes needed?

Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple uniquely identifies a row in a table. This information can be used to mark rows e.g. can be used to create an indexer.

Does this PR introduce any user-facing change?

Yes. With this change the customers will be able to access _metadata.row_index metadata column when reading Parquet data. The schema of _matadata column remains unchanged for file formats without row index support.

How was this patch tested?

  • Added FileMetadataStructSuite.scala to make sure the feature works correctly in different scenarios (supported/unsupported file format, batch/record reads, on/off heap memory...).
  • Added ParquetRowIndexSuite.scala to make sure the row indexes are generated correctly for Parquet file in conjunction with any combination of data skipping features.
  • Extended FileMetadataStructRowIndexSuite to account for new column in _metadata struct.

@ala
Copy link
Copy Markdown
Contributor Author

ala commented Jul 20, 2022

cc @cloud-fan @Yaohua628

// a subset of rows in the row group is going to be read. Note that there is a name
// collision here: these row indexes (unlike ones this class is generating) are counted
// starting from 0 in each of the row groups.
rowIndexIterator = pages.getRowIndexes.get.asScala.map(idx => idx + startingRowIdx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering is PageReadStore.getRowIndexes() would return continuous row indexes or not?

If the row indexes are continuous, then we don't need to store a long value per row. We can have something like RangeColumnVector(startIdx, length) to save cost to compute and store the row index vector (similar idea to ConstantColumnVector). cc @cloud-fan.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is not guaranteed. These row indexes are supposed to be produced as a result of page-level min/max filtering. So it there are 3 pages in a column chunk (intersection of row group and column) and 1st and 3rd match the predicate, but 2nd does not, we should be getting a non-continuous range of indexes.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add a test for this scenario?

Copy link
Copy Markdown
Contributor

@Yaohua628 Yaohua628 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on it!
Not familiar with the Parquet part, but left some questions and comments around _metadata, thanks!

val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
metadataColumns.map(_.name).flatMap {
case FileFormat.ROW_INDEX =>
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a qq: what if the user schema contains a column _tmp_metadata_row_index? would it be overridden?

maybe you can create an AttributeReference with a __metadata_col in its metadata field, see the usage of __metadata_col: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala#L188 and pattern match on it afterward (just be safe)?

Copy link
Copy Markdown

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this PR. I did the first pass on the changes and left a few comments.
My questions are:

  • Would you be able to provide performance numbers to see how row index affects reads and writes before and after the change?
  • I think we may need to have a way to opt out depending on the first question. Would you be able to take a look into that too?

I may need to review it again later as I might have missed context on file metadata and row index.

cc @sunchao

this.isPrimitive = column.isPrimitive();

if (missingColumns.contains(column)) {
if (ParquetRowIndexUtil.isRowIndexColumn(column)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How expensive is this call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a string comparison (so O(column name length), but in practice cheaper) that happens at most once per column per scan task. I believe it's in the same ballpark as all the other initialization happening at the beginning of the scan.

@ala
Copy link
Copy Markdown
Contributor Author

ala commented Jul 29, 2022

@sadikovi To answer the questions about performance:

Would you be able to provide performance numbers to see how row index affects reads and writes before and after the change?

I don't quite see how this change would impact the write performance. There are no changes to the write path, only to the read path.

I think we may need to have a way to opt out depending on the first question. Would you be able to take a look into that too?

I am not really sure how the opt-out would look like. The user can always just not read the _metadata.row_index and this bypasses pretty much all the code related to row indexes. If I introduce a config to disable row indexes altogether, the result will be very similar.

Copy link
Copy Markdown

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to provide performance numbers for reads with and without column index or quote the difference with and without row index enabled that you have observed?

By the way, row group metadata row count may not be accurate in some instances. We have seen files where the row count was incorrect. How will the newly added code behave in this scenario?

Can you update the PR title and description to state that this is for Parquet only? It was not particularly clear from the description.



object RowIndexUtil {
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting name, so it is a column index for the RowIndex column, isn't it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If you have a different suggestion, we can change it.

test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") {
withReadDataFrame("parquet", extraCol = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
// Column values are read from the file, rather than populated with generated row indexes.
// FIXME
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix it in this PR?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ala would you be able to create a follow-up ticket to address this so we don't forget? 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@ala ala changed the title [SPARK-37980][SQL] Extend METADATA column to support row indexes [SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet Aug 11, 2022
@ala ala changed the title [SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet [SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files Aug 11, 2022
@ala
Copy link
Copy Markdown
Contributor Author

ala commented Aug 11, 2022

By the way, row group metadata row count may not be accurate in some instances. We have seen files where the row count was incorrect. How will the newly added code behave in this scenario?

@sadikovi That is really bad. I think such files should be considered corrupt data, and it is expected that features might not work correctly with them. Can you maybe point me to such files, so that I can test the exact behavior?

Copy link
Copy Markdown

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks for making the changes! 👍

Regarding metadata count, that would be considered a bug in Parquet anyway. I may need some time to find those files. We can always address it in a follow-up.

@ala
Copy link
Copy Markdown
Contributor Author

ala commented Aug 12, 2022

@sadikovi The cost of reading the row_index column is in the same ballpark as the other metadata columns:

[info] Vectorized Parquet:                       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] no metadata columns                                 332            370          15         15.1          66.3       1.0X
[info] _metadata.file_path                                 436            491          33         11.5          87.1       0.8X
[info] _metadata.file_name                                 440            479          20         11.4          88.0       0.8X
[info] _metadata.file_size                                 377            420          24         13.3          75.4       0.9X
[info] _metadata.file_modification_time                    391            420          19         12.8          78.1       0.8X
[info] _metadata.row_index                                 434            489          27         11.5          86.7       0.8X
[info] _metadata                                           676            766          34          7.4         135.2       0.5X

[info] Parquet-mr:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] no metadata columns                                1250           1447          78          4.0         250.0       1.0X
[info] _metadata.file_path                                1688           1898         116          3.0         337.6       0.7X
[info] _metadata.file_name                                1678           1867          87          3.0         335.6       0.7X
[info] _metadata.file_size                                1518           1711          79          3.3         303.6       0.8X
[info] _metadata.file_modification_time                   1596           1701          60          3.1         319.3       0.8X
[info] _metadata.row_index                                1526           1725          79          3.3         305.3       0.8X
[info] _metadata                                          2268           2578         134          2.2         453.5       0.6X

And these numbers are in the same ballpark as for vanilla master branch:

[info] Vectorized Parquet:                       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] no metadata columns                                 346            411          31         14.5          69.1       1.0X
[info] _metadata.file_path                                 452            524          49         11.1          90.5       0.8X
[info] _metadata.file_name                                 446            489          24         11.2          89.2       0.8X
[info] _metadata.file_size                                 389            436          38         12.9          77.8       0.9X
[info] _metadata.file_modification_time                    387            421          19         12.9          77.4       0.9X
[info] _metadata                                           592            672          30          8.4         118.4       0.6X

[info] Parquet-mr:                               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] no metadata columns                                1209           1351          73          4.1         241.8       1.0X
[info] _metadata.file_path                                1595           1807         112          3.1         318.9       0.8X
[info] _metadata.file_name                                1592           1777         100          3.1         318.3       0.8X
[info] _metadata.file_size                                1493           1692         102          3.3         298.7       0.8X
[info] _metadata.file_modification_time                   1507           1688          87          3.3         301.5       0.8X
[info] _metadata                                          1998           2238         107          2.5         399.6       0.6X

@IonutBoicuAms
Copy link
Copy Markdown
Contributor

Can we have a committer merge this? The failing test looks unrelated to the PR.

}
// If needed, compute row indexes within a file.
if (rowIndexGenerator != null) {
rowIndexGenerator.populateRowIndex(columnVectors, num);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not very familiar with parquet read codebase, does this happen after row group skipping or not?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was looking through the code and it seems like it happens after row group skipping. Also, the tests in ParquetRowIndexSuite check that the rowIndex column doesn't have any of the values from the skipped row groups.


// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we add this name because row_index is more likely to conflict with data columns?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
metadataColumns.map(_.name).flatMap {
case FileFormat.ROW_INDEX =>
if ((readDataColumns ++ partitionColumns).map(_.name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we consider case sensitivity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case _ => None
}

val outputSchema = (readDataColumns ++ fileFormatReaderGeneratedMetadataColumns).toStructType
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename it to outputDataSchema? otherwise it's a bit confusing that this is inconsistent with outputAttributes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case FileFormat.ROW_INDEX =>
fileFormatReaderGeneratedMetadataColumns
.filter(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
.head.withName(FileFormat.ROW_INDEX)
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filter(...).head -> find(...).get

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

object RowIndexUtil {
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
field.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it always the last column?

Copy link
Copy Markdown
Contributor

@IonutBoicuAms IonutBoicuAms Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the last "data" column, in the outputAttributes, but there are also the partition and metadata columns.
Not sure about the schema, if the order in metadata columns is kept even if the users add additional metadata columns.


override def getCurrentValue: InternalRow = {
val row = parent.getCurrentValue
row.setLong(rowIndexColumnIdx, parent.getCurrentRowIndex)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so parquet-mr already provides APIs to get row index?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was recently introduced into parquet-mr.

@cloud-fan
Copy link
Copy Markdown
Contributor

It seems the python linter is broken in GA, cc @HyukjinKwon

ImportError: cannot import name '_unicodefun' from 'click' (/usr/local/lib/python3.9/dist-packages/click/__init__.py)
Please run 'dev/reformat-python' script.

@cloud-fan
Copy link
Copy Markdown
Contributor

thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants