Skip to content

chore: More refactoring of type checking logic#1744

Merged
andygrove merged 15 commits intoapache:mainfrom
andygrove:scan-refactor-2
May 19, 2025
Merged

chore: More refactoring of type checking logic#1744
andygrove merged 15 commits intoapache:mainfrom
andygrove:scan-refactor-2

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented May 16, 2025

Which issue does this PR close?

N/A

Rationale for this change

Fixing technical debt in preparation for other improvements for native scans and complex type support

What changes are included in this PR?

  • Move some type checking logic into the native scan execs
  • Improve fallback message for native scans reading byte/short
  • Move usingDataSourceExec and usingDataSourceExecWithIncompatTypes into CometTestBase
  • Reimplement type-checking logic when determining if a Comet sink is supported
  • Add one more fuzz test for shuffle, for more coverage
  • Add a check that we were missing for falling back to Spark for GROUP BY on complex types

How are these changes tested?

@andygrove andygrove requested a review from Copilot May 16, 2025 15:02
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the type checking logic to reduce technical debt and better support native scans and complex type handling. Key changes include updating import paths and function calls for data source execution checks, refactoring the fallback message and condition for ByteType/ShortType support in native scans, and simplifying the logic for determining supported types in Comet sink operators.

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala Adjusted imports and usage of usingDataSourceExec in tests
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala Moved usingDataSourceExec and usingDataSourceExecWithIncompatTypes into the test base
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala Refactored type-checking logic for ByteType/ShortType with new conditions and fallback messages
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala Updated type-checking logic for ByteType/ShortType with adjusted configuration checks
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala Simplified support check for Comet sink operators by removing conditional conversion logic
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala Removed redundant definitions of usingDataSourceExec and usingDataSourceExecWithIncompatTypes
Comments suppressed due to low confidence (2)

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:530

  • There appears to be a logic mismatch when handling ByteType/ShortType: the condition checks if COMET_SCAN_ALLOW_INCOMPATIBLE is true while the fallback message indicates it should be false. Please verify the intended configuration for native scans.
case ByteType | ShortType if CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT && CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:2761

  • [nitpick] The updated logic for supported types in Comet sink operators now always assumes complex types are allowed. Confirm that removing the conversion flag checks (e.g. COMET_CONVERT_FROM_PARQUET_ENABLED) is the intended behavior.
case op if isCometSink(op) =>

@andygrove andygrove marked this pull request as ready for review May 16, 2025 16:31
Comment on lines -245 to -252
def usingDataSourceExec(conf: SQLConf): Boolean =
Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION).contains(
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))

def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
usingDataSourceExec(conf) &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods moved to CometTestBase and reduces the number of times we use these configs in implementation code.

Comment on lines -2762 to -2770
if isCometSink(op) && op.output.forall(a =>
supportedDataType(
a.dataType,
// Complex type supported if
// - Native datafusion reader enabled (experimental) OR
// - conversion from Parquet/JSON enabled
allowComplex =
usingDataSourceExec(conf) || CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED
.get(conf) || CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf))) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember why these checks were once necessary but I don't think we need them now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks were limiting our support for complex types and removing them exposed a new bug with grouping on maps, which is now fixed in this PR.

Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, and a new test for fun! Thanks @andygrove!

@codecov-commenter
Copy link

codecov-commenter commented May 16, 2025

Codecov Report

Attention: Patch coverage is 22.22222% with 14 lines in your changes missing coverage. Please review.

Project coverage is 58.56%. Comparing base (f09f8af) to head (3634fc1).
Report is 195 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 30.00% 3 Missing and 4 partials ⚠️
...ala/org/apache/spark/sql/comet/CometScanExec.scala 20.00% 2 Missing and 2 partials ⚠️
...g/apache/spark/sql/comet/CometNativeScanExec.scala 0.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1744      +/-   ##
============================================
+ Coverage     56.12%   58.56%   +2.43%     
- Complexity      976     1133     +157     
============================================
  Files           119      130      +11     
  Lines         11743    12681     +938     
  Branches       2251     2369     +118     
============================================
+ Hits           6591     7426     +835     
- Misses         4012     4063      +51     
- Partials       1140     1192      +52     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

andygrove commented May 16, 2025

I have a Spark SQL test failure to resolve:

SPARK-47430 Support GROUP BY MapType *** FAILED *** (124 milliseconds)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1711.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1711.0 (TID 1466) (a357d001add0 executor driver): org.apache.comet.CometNativeException: Not yet implemented: not yet implemented: Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false)

edit: The failure is specific to Spark 4.0.0, which added support for grouping by map types

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Just one minor question.

return None
}

if (groupingExpressions.exists(expr =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other check we need to include here for structs/arrays?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 3.x supports grouping by structs and arrays and we already have tests passing for those cases. Grouping by map is new in Spark 4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I suppose there could be structs containing maps 🤔

@andygrove
Copy link
Member Author

Thanks for the reviews @mbutrovich and @parthchandra

@andygrove andygrove merged commit 7717a25 into apache:main May 19, 2025
78 checks passed
@andygrove andygrove deleted the scan-refactor-2 branch May 19, 2025 16:32
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants