Skip to content

Conversation

@avamingli
Copy link
Contributor

@avamingli avamingli commented Jun 19, 2025

Postgres UPSTREAM does not support parallel DISTINCT processing since DISTINCT across multiple workers cannot be guaranteed. In MPP databases, however, we can utilize Motion to redistribute tuples across multiple workers within a parallel query.

For a DISTINCT query like:

select distinct a from t_distinct_0;

we can create a parallel plan based on the underlying node's Parallel Scan on the table. The tuples are distributed randomly after the Parallel Scan, even when the distribution key matches the target expression.

The pre-distinct node uses Streaming HashAggregate or HashAggregate to deduplicate some tuples in parallel, which are then redistributed according to the DISTINCT expressions. Finally, a second-stage process handles the DISTINCT operation.

                         QUERY PLAN
------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: a
               Hash Module: 3
               ->  Streaming HashAggregate
                     Group Key: a
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

Parallel Group Aggregation is also supported:

explain(costs off)
select distinct a, b from t_distinct_0;
                        QUERY PLAN
-----------------------------------------------------------
 GroupAggregate
   Group Key: a, b
   ->  Gather Motion 6:1  (slice1; segments: 6)
         Merge Key: a, b
         ->  GroupAggregate
               Group Key: a, b
               ->  Sort
                     Sort Key: a, b
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

performance

For DISTINCT, the performance in parallel processing is closely tied to the tuples among each worker process, particularly in the case of streaming hash aggregates.
Overall, the test cases indicate that a parallel execution plan with two workers achieves nearly half the runtime of a non-parallel plan.

The parallel plans(hashagg, streaming-hashagg, hashagg-groupagg) have 2 workers.

DISTINCT 1 2 3 avg
non-parallel 8207.833 8416.01 8163.197 8262.346667
hashagg 4550.331 4558.036 4219.485 4442.617333
streaming-hashagg 4197.242 4200.889 4099.575 4165.902
hashagg-groupagg 4336.934 4710.07 4352.795 4466.599667

chart

non-parallel

select distinct a from t_distinct_0;
                         QUERY PLAN
------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  HashAggregate
         Group Key: a
         ->  Redistribute Motion 3:3  (slice2; segments: 3)
               Hash Key: a
               ->  HashAggregate
                     Group Key: a
                     ->  Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(9 rows)

hashagg

select distinct a from t_distinct_0;
                         QUERY PLAN
------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: a
               Hash Module: 3
               ->  HashAggregate
                     Group Key: a
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

streaming-hashagg

select distinct a from t_distinct_0;
                         QUERY PLAN
------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: a
               Hash Module: 3
               ->  Streaming HashAggregate
                     Group Key: a
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

hashagg-groupagg

select distinct a from t_distinct_0;
                            QUERY PLAN
------------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   Merge Key: a
   ->  GroupAggregate
         Group Key: a
         ->  Sort
               Sort Key: a
               ->  Redistribute Motion 6:6  (slice2; segments: 6)
                     Hash Key: a
                     Hash Module: 3
                     ->  Streaming HashAggregate
                           Group Key: a
                           ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(13 rows)

Authored-by: Zhang Mingli [email protected]

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

CI Skip Instructions


@avamingli
Copy link
Contributor Author

pre discussion: #914

@yjhjstz
Copy link
Member

yjhjstz commented Jun 19, 2025

@avamingli
Copy link
Contributor Author

we can import some design from PG parallel DISTINCT https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=22c4e88ebff408acd52e212543a77158bde59e69 ?

Hi,

I wasn't aware that Postgres supports parallel DISTINCT functionality, although it hasn't been cherry-picked into CBDB yet. Thanks for pointing it out.

I reviewed the code, and overall, it aligns with my design approach. Postgres uses a Gather node for a two-phase aggregation, performing the first phase on worker nodes. However, in CBDB, we don't have a Gather node; instead, we place the two-phase aggregation at the top-level node in a distributed setting.

Additionally, we must consider data distribution, which requires the introduction of Motion nodes. Even if we were to incorporate Postgres's code, it would still necessitate distributed adaptation, though the plan's outcome would remain consistent with our current implementation.

While I'm pleased to see similarities between our designs, I believe there's no immediate need to integrate this part of the code.

@avamingli avamingli added type: Performance cloudberry runs slow on some particular query planner labels Jun 20, 2025
Copy link
Contributor

@my-ship-it my-ship-it left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Postgres UPSTREAM does not support parallel DISTINCT processing since
DISTINCT across multiple workers cannot be guaranteed. In MPP databases,
however, we can utilize Motion to redistribute tuples across multiple
workers within a parallel query.

For a DISTINCT query like:
select distinct a from t_distinct_0;

we can create a parallel plan based on the underlying node's Parallel
Scan on the table. The tuples are distributed randomly after the
Parallel Scan, even when the distribution key matches the target
expression.

The pre-distinct node uses Streaming HashAggregate or HashAggregate to
deduplicate some tuples in parallel, which are then redistributed
according to the DISTINCT expressions. Finally, a second-stage process
handles the DISTINCT operation.

                         QUERY PLAN
------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: a
               Hash Module: 3
               ->  Streaming HashAggregate
                     Group Key: a
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

Parallel Group Aggregation is also supported:

explain(costs off)
select distinct a, b from t_distinct_0;
                        QUERY PLAN
-----------------------------------------------------------
 GroupAggregate
   Group Key: a, b
   ->  Gather Motion 6:1  (slice1; segments: 6)
         Merge Key: a, b
         ->  GroupAggregate
               Group Key: a, b
               ->  Sort
                     Sort Key: a, b
                     ->  Parallel Seq Scan on t_distinct_0
 Optimizer: Postgres query optimizer
(10 rows)

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli merged commit 75bcc59 into apache:main Jul 1, 2025
55 of 58 checks passed
@avamingli avamingli deleted the dev0 branch July 1, 2025 04:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

planner type: Performance cloudberry runs slow on some particular query

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants