Skip to content

feat: add read array support#1456

Merged
comphead merged 21 commits intoapache:mainfrom
comphead:dev
Mar 18, 2025
Merged

feat: add read array support#1456
comphead merged 21 commits intoapache:mainfrom
comphead:dev

Conversation

@comphead
Copy link
Contributor

@comphead comphead commented Feb 27, 2025

Which issue does this PR close?

Part of #1454 .

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Mar 1, 2025

Codecov Report

Attention: Patch coverage is 77.77778% with 2 lines in your changes missing coverage. Please review.

Project coverage is 58.33%. Comparing base (f09f8af) to head (39054f3).
Report is 88 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 71.42% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1456      +/-   ##
============================================
+ Coverage     56.12%   58.33%   +2.20%     
- Complexity      976      977       +1     
============================================
  Files           119      122       +3     
  Lines         11743    12212     +469     
  Branches       2251     2286      +35     
============================================
+ Hits           6591     7124     +533     
+ Misses         4012     3949      -63     
+ Partials       1140     1139       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@comphead comphead requested a review from andygrove March 3, 2025 16:14
@andygrove
Copy link
Member

Some tests are failing due to #1289

I think the root cause is that we are trying to shuffle with arrays and Comet shuffle does not support arrays yet. We need to fall back to Spark for these shuffles.

@andygrove
Copy link
Member

In CometExecRule we check to see if we support the partitioning types for the shuffle but do not check that we support the types of other columns.

@comphead Do you want to update these checks as part of this PR and see if it resolves the issue?

  case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
    private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
      plan.transformUp {
        case s: ShuffleExchangeExec
            if isCometPlan(s.child) && isCometNativeShuffleMode(conf) &&
              QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
                 ...
              
              
        case s: ShuffleExchangeExec
            if (!s.child.supportsColumnar || isCometPlan(s.child)) && isCometJVMShuffleMode(
              conf) &&
              QueryPlanSerde.supportPartitioningTypes(s.child.output, s.outputPartitioning)._1 &&
              !isShuffleOperator(s.child) =>
                ...

@comphead
Copy link
Contributor Author

comphead commented Mar 4, 2025

In CometExecRule we check to see if we support the partitioning types for the shuffle but do not check that we support the types of other columns.

@comphead Do you want to update these checks as part of this PR and see if it resolves the issue?

  case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
    private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
      plan.transformUp {
        case s: ShuffleExchangeExec
            if isCometPlan(s.child) && isCometNativeShuffleMode(conf) &&
              QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
                 ...
              
              
        case s: ShuffleExchangeExec
            if (!s.child.supportsColumnar || isCometPlan(s.child)) && isCometJVMShuffleMode(
              conf) &&
              QueryPlanSerde.supportPartitioningTypes(s.child.output, s.outputPartitioning)._1 &&
              !isShuffleOperator(s.child) =>
                ...

Thanks @andygrove I'll check that, you saving me hours of debugging

case s: StructType if allowComplex =>
s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
case a: ArrayType if allowComplex =>
supportedDataType(a.elementType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass allowComplex recursively here like supportedDataType(a.elementType, allowComplex)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@comphead
Copy link
Contributor Author

comphead commented Mar 7, 2025

I think the last merge brought up a new test which fails now on schema mismatch, checking this

- array_compact *** FAILED *** (204 milliseconds)
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 309.0 failed 1 times, most recent failure: Lost task 0.0 in stage 309.0 (TID 797) (Mac-1741305812954.local executor driver): org.apache.comet.CometNativeException: Invalid argument error: column types must match schema types, expected List(Field { name: "element", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) but found List(Field { name: "item", data_type: Int8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) at column index 0

@andygrove
Copy link
Member

I'd quite like to merge #1479 before this one so that we can be sure that no additional test failures are introduced for native_datafusion

@comphead
Copy link
Contributor Author

@andygrove @kazuyukitanimura please have a second look
Nested arrays and Iceberg compat support will be added in follow up PR

lazy_static = "1.4"
assertables = "7"
hex = "0.4.3"
datafusion-functions-nested = "46.0.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it to workspace = true so that we do not forget to update here when we upgrade DF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is possible to make it workspace = true, this package is optional for test only and optional packages are not allowed on workspace level.

I can include it in workspace as non optional but it will be compiled into the target binary which is probably not expected

}
});

runtime.block_on(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice to see an end-to-end test like this in the Rust project


case Literal(value, dataType) if supportedDataType(dataType, allowStruct = value == null) =>
case Literal(value, dataType)
if supportedDataType(dataType, allowComplex = value == null) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be nice to have a comment here explaining why allowComplex = value == null

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the description from Closes to Part of

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @comphead

@comphead
Copy link
Contributor Author

Thanks everyone

@comphead comphead merged commit 59fae94 into apache:main Mar 18, 2025
69 checks passed
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
* feat: add read array support
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants