[SPARK-36879][SQL] Support Parquet v2 data page encoding (DELTA_BINARY_PACKED) for the vectorized path #34471
[SPARK-36879][SQL] Support Parquet v2 data page encoding (DELTA_BINARY_PACKED) for the vectorized path #34471parthchandra wants to merge 9 commits intoapache:masterfrom
Conversation
|
Kubernetes integration test starting |
fc9683b to
10659d3
Compare
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #144861 has finished for PR 34471 at commit
|
|
Kubernetes integration test status failure |
|
Test build #144864 has finished for PR 34471 at commit
|
|
Jenkins, retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144880 has finished for PR 34471 at commit
|
sunchao
left a comment
There was a problem hiding this comment.
Thanks @parthchandra for working on this! I left some comments.
.gitignore
Outdated
There was a problem hiding this comment.
+1 with @sunchao 's comment. Please remove this from this PR.
There was a problem hiding this comment.
this doesn't seem to be used anywhere
There was a problem hiding this comment.
But not for long (there's a horrible pun in here somewhere).
I need this for the vectorized implemenatation of DeltaByteArrayReader (which I did not include to make review easier).
There was a problem hiding this comment.
in that case can we put this together with the follow-up PR?
There was a problem hiding this comment.
I just knew you would say that :). Done.
There was a problem hiding this comment.
maybe add some comments for this? what are c, rowId and val for?
There was a problem hiding this comment.
seems it's better to have an abstract class inheriting ValuesReader and VectorizedValuesReader with this default behavior defined, rather than repeating the same thing in all the different value readers.
this can be done separately though.
There was a problem hiding this comment.
Why indeed. (Intellij generated code for unimplemented methods)
There was a problem hiding this comment.
I think we'll need to implement these too.
There was a problem hiding this comment.
Oh dear. I implemented only the methods the original PR had implemented. On closer look we also need support for byte, short, date, timestamp, yearmonth interval, and daytime interval datatypes which are stored as int32 or int64.
Perf note: Rebased dates and timestamps appear to be a backward compatibility fix and incur the penalty of checking if the value needs to be rebased.
There was a problem hiding this comment.
I think this can be done more efficiently, for instance we don't need to unpack the bits anymore, and don't need to compute the original value from delta, etc.
There was a problem hiding this comment.
I think we do. The original unit tests have interleaving read and skip. To continue to read after a skip, we need to have read the previous value.
.../java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
Outdated
Show resolved
Hide resolved
|
cc @sadikovi @viirya @dongjoon-hyun too |
|
Thank you for the review @sunchao! Let me address the comments. |
There was a problem hiding this comment.
Could you use two-space indentation like the other part of this file, @parthchandra ?
There was a problem hiding this comment.
The import order is a little strange. Could you grouping java import (line 30 and 19) together as the first group?
There was a problem hiding this comment.
nit. Let's remove redundant empty line.
There was a problem hiding this comment.
Let's make these two lines into a single liner.
- readValues(total, null, -1, (w, r, v) -> {
- });
+ readValues(total, null, -1, (w, r, v) -> {});There was a problem hiding this comment.
Could you put this at the beginning before int remaining = total;?
|
Looks like it may take some time to address some of the review comments. Marking this PR as draft in the meantime. |
3572eaa to
6021d90
Compare
|
Kubernetes integration test starting |
6021d90 to
a88c721
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
sunchao
left a comment
There was a problem hiding this comment.
Thanks a lot for updating this @parthchandra ! Overall look pretty good. I think we just need to address the issue with benchmark and attach the result together with the PR. You can find out how to get benchmark result using GitHub workflow here.
There was a problem hiding this comment.
nit: maybe revise this message a bit, since "total value count is + valuesRead" looks a bit confusing.
There was a problem hiding this comment.
This won't work yet because of the BooleanType added recently.
Error:
[error] Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'sum(parquetv2table.id)' due to data type mismatch: function sum requires numeric or interval types, not boolean; line 1 pos 7;
[error] 'Aggregate [unresolvedalias(sum(id#40), None)]
[error] +- SubqueryAlias parquetv2table
[error] +- View (`parquetV2Table`, [id#40])
[error] +- Relation [id#40] parquet
There was a problem hiding this comment.
Uh. Rebase issue. Fixed.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146200 has finished for PR 34471 at commit
|
|
@parthchandra could you address the unit tests failure? |
|
The unit tests are failing in parts that I am not familiar with. Previously, re-running the tests had worked, but this time around the tests are failing every time. Can I get some help figuring out where the problem is? |
|
If you click the SparkQA test build link you should see the failed tests. For instance: |
|
Thank you @sunchao. I had gone thru the log and failed to see the test(s) that had failed. One of the unit tests was checking for the error message that accompanied an exception and as part of the review I had changed the error message! Updated the test. The tests should pass now. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146491 has finished for PR 34471 at commit
|
… 2.x's Vectorized Reader
…nce Booleans/RLE is implemented)
64cc82f to
6bb21f0
Compare
|
Committed to master branch, thanks @parthchandra ! |
| saveAsCsvTable(testDf, dir.getCanonicalPath + "/csv") | ||
| saveAsJsonTable(testDf, dir.getCanonicalPath + "/json") | ||
| saveAsParquetTable(testDf, dir.getCanonicalPath + "/parquet") | ||
| saveAsParquetV2Table(testDf, dir.getCanonicalPath + "/parquetV2") |
There was a problem hiding this comment.
Maybe we should update the benchmark-result of @parthchandraDataSourceReadBenchmark
There was a problem hiding this comment.
I found that there are still unsupported encoding in Data Page V2, such as RLE for Boolean. It seems that it is not time to update the benchmark, please ignore my previous comments
There was a problem hiding this comment.
@LuciferYang I was getting ready to set up a PR for the RLE/Boolean encoding and noticed that you have done so. Thank you!
Adding back the benchmark in a new PR.
| private ByteBufferInputStream in; | ||
|
|
||
| // temporary buffers used by readByte, readShort, readInteger, and readLong | ||
| byte byteVal; |
There was a problem hiding this comment.
Should these 4 field be private?
| withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { | ||
| val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") | ||
| .select($"dict".cast(catalystType), $"plain".cast(catalystType)) | ||
| withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { |
| Seq.tabulate(N)(_ => Row(nonRebased))) | ||
| } | ||
| } | ||
| } |

What changes were proposed in this pull request?
Implements a vectorized version of the parquet reader for DELTA_BINARY_PACKED encoding
This PR includes a previous PR for this issue which passed the read request thru to the parquet implementation and which was not vectorized. The current PR builds on top of that PR (hence both are included).
Why are the changes needed?
Currently Spark throws an exception when reading data with these encodings if vectorized reader is enabled
Does this PR introduce any user-facing change?
No
How was this patch tested?
Additional unit tests for the encoding for both long and integer types (mirroring the unit tests in the Parquet implementation)