Skip to content

Unsupported aggregation mode PartialMerge #2870

@comphead

Description

@comphead

Comet falls back with simple agg queries SELECT i, sum(v1), count(distinct v) FROM t2 group by i

Reproduce

test("count distinct") {

    withSQLConf("spark.comet.enabled" -> "false") {
      sql("drop table if exists t")
      sql("CREATE TABLE t(v VARCHAR(10), v1 VARCHAR(10), i INT) USING PARQUET")
      sql("INSERT INTO t VALUES ('c', 'a', 1)")
      sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
      sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
      sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
      sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
      sql("INSERT INTO t VALUES ('c', 'a', 1)")
      sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
      sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
      sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
      sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
      sql("INSERT INTO t VALUES ('c', 'a', 1)")
      sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
      sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
      sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
      sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
      sql("select * from t").repartition(3).write.mode("overwrite").parquet("/tmp/test_11")
    }
    withSQLConf(
      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
      "spark.comet.cast.allowIncompatible" -> "true",
      "spark.sql.adaptive.enabled" -> "false",
      "spark.comet.explain.native.enabled" -> "true",
      "spark.comet.enabled" -> "true",
      // "spark.comet.exec.replaceSortMergeJoin" -> "true",
      "spark.comet.exec.shuffle.enableFastEncoding" -> "true",
      "spark.comet.exec.shuffle.enabled" -> "true",
      // "spark.comet.exec.shuffle.fallbackToColumnar" -> "true",
      "spark.comet.explainFallback.enabled" -> "true",
      CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_iceberg_compat",
      "spark.shuffle.manager" -> "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
      "spark.comet.logFallbackReasons.enabled" -> "true") {
      // SparkEnv.get.conf.set("spark.comet.explain.native.enabled", "true")
      spark.read.parquet("/tmp/test_11").createOrReplaceTempView("t2")
      // sql("SELECT count(distinct v, v) FROM t2").explain()
      checkSparkAnswerAndOperator("SELECT i, sum(v1), count(distinct v) FROM t2 group by i")
    }
  }
Expected only Comet native operators, but found HashAggregate.
plan: HashAggregate
+- CometColumnarToRow
   +- CometColumnarExchange
      +- HashAggregate
         +-  HashAggregate [COMET: Unsupported aggregation mode PartialMerge]
            +- CometColumnarToRow
               +- CometColumnarExchange
                  +-  HashAggregate [COMET: cast(v1#146 as double) is not fully compatible with Spark (Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode.). To enable it anyway, set spark.comet.expression.Cast.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)., Unsupported aggregate expression(s)]
                     +- CometColumnarToRow
                        +- CometScan [native_iceberg_compat] parquet 

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions