-
Notifications
You must be signed in to change notification settings - Fork 290
Open
Description
Comet falls back with simple agg queries SELECT i, sum(v1), count(distinct v) FROM t2 group by i
Reproduce
test("count distinct") {
withSQLConf("spark.comet.enabled" -> "false") {
sql("drop table if exists t")
sql("CREATE TABLE t(v VARCHAR(10), v1 VARCHAR(10), i INT) USING PARQUET")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("INSERT INTO t VALUES ('c', 'a', 1)")
sql("INSERT INTO t VALUES ('c1', 'a1', 1)")
sql("INSERT INTO t VALUES ('c2', 'a2', 2)")
sql("INSERT INTO t VALUES ('c3', 'a3', 2)")
sql("INSERT INTO t VALUES ('c4', 'a4', 2)")
sql("select * from t").repartition(3).write.mode("overwrite").parquet("/tmp/test_11")
}
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
"spark.comet.cast.allowIncompatible" -> "true",
"spark.sql.adaptive.enabled" -> "false",
"spark.comet.explain.native.enabled" -> "true",
"spark.comet.enabled" -> "true",
// "spark.comet.exec.replaceSortMergeJoin" -> "true",
"spark.comet.exec.shuffle.enableFastEncoding" -> "true",
"spark.comet.exec.shuffle.enabled" -> "true",
// "spark.comet.exec.shuffle.fallbackToColumnar" -> "true",
"spark.comet.explainFallback.enabled" -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_iceberg_compat",
"spark.shuffle.manager" -> "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
"spark.comet.logFallbackReasons.enabled" -> "true") {
// SparkEnv.get.conf.set("spark.comet.explain.native.enabled", "true")
spark.read.parquet("/tmp/test_11").createOrReplaceTempView("t2")
// sql("SELECT count(distinct v, v) FROM t2").explain()
checkSparkAnswerAndOperator("SELECT i, sum(v1), count(distinct v) FROM t2 group by i")
}
}
Expected only Comet native operators, but found HashAggregate.
plan: HashAggregate
+- CometColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- HashAggregate [COMET: Unsupported aggregation mode PartialMerge]
+- CometColumnarToRow
+- CometColumnarExchange
+- HashAggregate [COMET: cast(v1#146 as double) is not fully compatible with Spark (Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode.). To enable it anyway, set spark.comet.expression.Cast.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)., Unsupported aggregate expression(s)]
+- CometColumnarToRow
+- CometScan [native_iceberg_compat] parquet
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels