Skip to content

Add support for extended (chunked) arrays for Parquet format#40485

Merged
Avogar merged 1 commit intoClickHouse:masterfrom
arthurpassos:fix-parquet-chunked-array-deserialization
Sep 1, 2022
Merged

Add support for extended (chunked) arrays for Parquet format#40485
Avogar merged 1 commit intoClickHouse:masterfrom
arthurpassos:fix-parquet-chunked-array-deserialization

Conversation

@arthurpassos
Copy link
Copy Markdown
Contributor

@arthurpassos arthurpassos commented Aug 22, 2022

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

ClickHouse was using parquet::FileReader::ReadAll to parse Parquet. This code path leads to Nested data conversions not implemented for chunked array outputs when the input ends up building a chunked array internally. According to arrow-upstream folks, using the FileReader::GetRecordBatchReader would result in a different code path that could work.

The SELECT statement succeeds when using the latter, but I couldn't verify if the data is correct. I believe it is. In order to verify, I need to find a way to compare the original Parquet file with the one processed by ClickHouse. The test file I have been using is big and ClickHouse fails to export it to Parquet (this is a different problem not in the scope of this PR).

I have tried some combinations of transformations (JSON & Parquet) using Python, Spark & ClickHouse to find a way to validate the data, all of them failed because of a variety of reasons like:

  1. Python's pyarrow throws the very same exception ClickHouse was throwing when it tries to read the original file. Can't load it into memory.
  2. Python's fastparquet fails to read the original file with weird exception.
  3. ClickHouse can't export the SELECT statement into Parquet because of some arrow internal memory limitation. It throws an Exception.
  4. Splitting the files into chunks and exporting failed because of formatting and encoding issues. In JSON, for instance, some escape characters were added by ClickHouse, while they were not added by Spark.

I'll continue investigating ways to validate the impl and possibly implement a test.

Closes #39944

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

@robot-clickhouse robot-clickhouse added the pr-improvement Pull request with some product improvements label Aug 22, 2022
@alexey-milovidov alexey-milovidov added the can be tested Allows running workflows for external contributors label Aug 22, 2022
@Avogar Avogar self-assigned this Aug 22, 2022
@arthurpassos
Copy link
Copy Markdown
Contributor Author

Earlier this morning I managed to compare the top 50% of the original file with the one processed by ClickHouse using Spark & pyspark. They match! Didn't compare the other half because the pyspark API interface available to grab last N elements is tail and it was taking way more memory than limit.

I can't use the test file I have to implement a test because it's confidential data. I just managed to generate a file with the schema presented in the issue, but it doesn't raise the exception. The test file that was provided contains many more columns than what was in the select statement, maybe it has something to do with that. Will investigate further.

@arthurpassos
Copy link
Copy Markdown
Contributor Author

I have validated the implementation by doing the following:

  1. Export the file processed by ClickHouse into two Parquet files, command: clickhouse-client --query "select id, _acc_fields_map from s3(<file_path>, 'Parquet', 'id Int64, _acc_fields_map Map(String, String)') order by id limit <lower_limit>, <upper_limit> FORMAT Parquet" > parquet validation/processed<part_number>.parquet. lower_limit, upper_limit and part_number being 0/80692, 80692/161384 and 1/2 for part1 and part2 respectively.
  2. Using PySpark, concat both parts processed by ClickHouse into a single DataFrame. Then, assert the equality of the original dataframe (read from original file) with the processed data frame. Python script:
original_df = spark.read.parquet("original-strippedd.parquet").orderBy("id")

processed_df1 = spark.read.parquet("processed1.parquet")

processed_df2 = spark.read.parquet("processed2.parquet")

processed_concatened = processed_df1.union(processed_df2).withColumn("fields_map", F.col("fields_map").cast("map<string,string>"))

assert original_df_filtered.collect() == processed_concatened.collect()

To be sure the assertion was doing its job, I dropped the first row in the original DataFrame and the assertion failed. Dropped it with original_df_filtered = original_df.where(original_df.id > <first_id>). The assertion failed.

I have spent the last week trying to write a test, but failed to do so. Mainly because I could not generate a file that raises the exception. I have tried several combinations using VERY large strings with LOW and HIGH cardinality. None of them caused the issue. AFAIK, the data gets internally chunked when the chunk memory limit is reached within a rowgroup. As of now, it's set to 2^32 - 1.

Since I validated it works in that case, I'll set this PR as ready for review. I am open for discussions.

@arthurpassos arthurpassos marked this pull request as ready for review August 29, 2022 14:21
@arthurpassos arthurpassos marked this pull request as draft August 30, 2022 12:37
@arthurpassos
Copy link
Copy Markdown
Contributor Author

Marked it as draft again. While the initial case is working properly, the below isn't:

import pyarrow as pa
import pyarrow.parquet as pq

arr = pa.array([[("a" * 2**30, 1)]], type = pa.map_(pa.string(), pa.int32()))
arr = pa.chunked_array([arr, arr])
tab = pa.table({ "arr": arr })

pq.write_table(tab, "test.parquet")

pq.read_table("test.parquet")

@arthurpassos
Copy link
Copy Markdown
Contributor Author

Marked it as draft again. While the initial case is working properly, the below isn't:

import pyarrow as pa
import pyarrow.parquet as pq

arr = pa.array([[("a" * 2**30, 1)]], type = pa.map_(pa.string(), pa.int32()))
arr = pa.chunked_array([arr, arr])
tab = pa.table({ "arr": arr })

pq.write_table(tab, "test.parquet")

pq.read_table("test.parquet")

Arrow lib contains two variants of String: String and LargeString. After some hackish changes on arrow lib, I managed to force the use of LargeStrings and the data doesn't get chunked anymore. Using LargeString by default certainly has its implications and does use more resources. Suggestion from arrow upstream folks is to add a setting to the library API that allows conversion from non-large types to large types. Since it's a 3rd party library and a rather complicated change, it could take a long time to get this right & merged. More info on ARROW-17459.

The changes in this PR seems to fix the original case by avoiding that code path. It doesn't solve the latter, tho. It was a suggestion from arrow upstream folks in ARROW-17459-comment.

Based on that, I am re-opening this PR as ready-for-review.

@arthurpassos arthurpassos marked this pull request as ready for review September 1, 2022 17:29
@arthurpassos
Copy link
Copy Markdown
Contributor Author

@Avogar kind ping :)

@Avogar Avogar merged commit f53aa86 into ClickHouse:master Sep 1, 2022
Enmk added a commit to Altinity/ClickHouse that referenced this pull request Sep 19, 2022
Backport ClickHouse#40485 to 22.3: fix parquet chunked arrays
Enmk pushed a commit to Altinity/ClickHouse that referenced this pull request Sep 21, 2022
…nked-array-deserialization

Add support for extended (chunked) arrays for Parquet format
Enmk added a commit to Altinity/ClickHouse that referenced this pull request Sep 21, 2022
Backport ClickHouse#40485 to 22.3: fix parquet chunked arrays
@alexey-milovidov
Copy link
Copy Markdown
Member

arrow::Status read_status = rbr->ReadAll(&table);

WTF?
Revert?

@alexey-milovidov
Copy link
Copy Markdown
Member

Actually it's alright.

Enmk pushed a commit to Altinity/ClickHouse that referenced this pull request Feb 8, 2023
…nked-array-deserialization

Add support for extended (chunked) arrays for Parquet format
Enmk added a commit to Altinity/ClickHouse that referenced this pull request Feb 9, 2023
…array_40485

22.8 Backport of ClickHouse#40485 parquet chunked array support
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for extended (chunked) arrays for Parquet format in ClickHouse please

4 participants