Skip to content

Conversation

@avamingli
Copy link
Contributor

@avamingli avamingli commented Jul 21, 2023

We have Parallel Nested Loop Left Anti Semi (Not-In) Join, but in CBDB, enable_nestloop is default false.
And we disable Parallel Hash Left Anti Semi (Not-In) Join before, because we thought that it will return wrong results and it's hard to tell null values in which batch.
So that in CBDB now, there is usually a non-parallel Hash LASJ and it shows degradation performance at a internal test(by @HuSen8891 ) TPCH 's SQL16 case.

After consideration, I think it's only problem for Parallel-aware Hash Left Anti Semi (Not-In) Join with a shared Hash table(yeah, working on another branch to enable that), for no shared Hash table(Parallel-oblivious), it would have all the data it should have from inner side. And it doesn't matter null values in which batch.

Enable Parallel-oblivious Hash Left Anti Semi (Not-In) Join will benefit a lot if outer table is huge and inner table is very small(perfect for TPCH SQL16 see #31 ).

Performance:(Qingyun Cloud, avg for 3 times, t1 has 100 million rows join t2 with 1200 rows, see [0] for DDL and DML, non-parallel compared with parallel_workers 2, 4, 6 , 8) of

            select sum(t1.c1) from t1 where c1 not in (select c1 from t2);

PH-LASJ

parallel workers avg duration(s) 1st 2nd 3rd
0 18.88366667 18.286 19.305 19.06
2 9.719333333 9.791 9.689 9.678
4 7.438333333 7.541 7.15 7.624
6 7.101333333 7.019 7.236 7.049
8 6.491333333 6.218 6.206 7.05

Typical LASJ plans are as below, see [1] below for example DDL and DML.

Non-parallel plan:

explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=1808.872..1808.875 rows=1 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=1745.235..1808.858 rows=3 loops=1)
         ->  Partial Aggregate (actual time=1808.622..1808.625 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=2.890..1583.005 rows=1667434 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Seq Scan on t1 (actual time=0.355..678.531 rows=1667832 loops=1)
                     ->  Hash (actual time=2.068..2.069 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:3  (slice2; segments: 3) (actual time=1.476..1.772 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.356..0.499 rows=407 loops=1)
 Planning Time: 0.454 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4443K bytes avg x 3x(0) workers, 4443K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 1809.517 ms
(18 rows)
Time: 1810.827 ms (00:01.811)

Parallel plan:

explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=758.707..758.710 rows=1 loops=1)
   ->  Gather Motion 6:1  (slice1; segments: 6) (actual time=668.747..758.685 rows=6 loops=1)
         ->  Partial Aggregate (actual time=752.479..752.483 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=3.010..565.127 rows=833732 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Parallel Seq Scan on t1 (actual time=0.368..231.049 rows=833932 loops=1)
                     ->  Hash (actual time=2.148..2.149 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:6  (slice2; segments: 3) (actual time=0.203..1.779 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.361..0.499 rows=407 loops=1)
 Planning Time: 0.470 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4483K bytes avg x 6x(0) workers, 4483K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 759.440 ms
(18 rows)
Time: 760.874 ms

[0] Example:

create table t1(c1 int, c2 int) using ao_row with(parallel_workers=8) distributed by (c1);
create table t2(c1 int, c2 int) using ao_row with(parallel_workers=8) distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 8;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 8;
insert into t1 select i, i from generate_series(1, 100000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
analyze t1;
analyze t2;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
select sum(t1.c1) from t1 where c1 not in (select c1 from t2);

[1] Example:

create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100; 
set max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i; 
analyze t1;
analyze t2;
explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);

Authored-by: Zhang Mingli [email protected]

closes: #ISSUE


Change logs

Describe your change clearly, including what problem is being solved or what feature is being added.

If it has some breaking backward or forward compatibility, please clary.

Why are the changes needed?

Describe why the changes are necessary.

Does this PR introduce any user-facing change?

If yes, please clarify the previous behavior and the change this PR proposes.

How was this patch tested?

Please detail how the changes were tested, including manual tests and any relevant unit or integration tests.

Contributor's Checklist

Here are some reminders before you submit the pull request:

  • Document changes
  • Communicate in the GitHub Issues or Discussions (list them if needed)
  • Add tests for the change
  • Pass make installcheck
  • Pass make -C src/test installcheck-cbdb-parallel

@avamingli
Copy link
Contributor Author

This pr is actually ready and works well, but we lack of CBDB Parallel test CI now, so I make it to draft.

@avamingli
Copy link
Contributor Author

This pr is actually ready and works well, but we lack of CBDB Parallel test CI now, so I make it to draft.

Seems CI will run even it's a draft pr, turn it to ready..

@avamingli avamingli marked this pull request as ready for review July 21, 2023 15:16
@avamingli
Copy link
Contributor Author

Hold on this pr, we should wait for parallel CI test enabled. @my-ship-it

@my-ship-it
Copy link
Contributor

Hold on this pr, we should wait for parallel CI test enabled. @my-ship-it

🆗

@avamingli avamingli force-pushed the enable_parallel_oblivious_lasj_hashjoin branch from b70b8c9 to b981058 Compare July 24, 2023 11:58
@avamingli avamingli linked an issue Jul 25, 2023 that may be closed by this pull request
2 tasks
@avamingli avamingli force-pushed the enable_parallel_oblivious_lasj_hashjoin branch 2 times, most recently from 6d0c7d6 to 6569711 Compare July 28, 2023 09:16
@avamingli avamingli self-assigned this Aug 1, 2023
@avamingli avamingli requested a review from yjhjstz August 2, 2023 02:49
@avamingli avamingli force-pushed the enable_parallel_oblivious_lasj_hashjoin branch 2 times, most recently from 07b4588 to 629cae3 Compare August 7, 2023 12:54
This is a parallel-oblivious parallel hash join, that each inner side
table would be duplicately processed without a shared hash table.

We could benefit from parallel if the outer table is large and inner
table is relatively small.

See [1] below for example DDL and DML.

Non-parallel plan:
explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=1808.872..1808.875 rows=1 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=1745.235..1808.858 rows=3 loops=1)
         ->  Partial Aggregate (actual time=1808.622..1808.625 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=2.890..1583.005 rows=1667434 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Seq Scan on t1 (actual time=0.355..678.531 rows=1667832 loops=1)
                     ->  Hash (actual time=2.068..2.069 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:3  (slice2; segments: 3) (actual time=1.476..1.772 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.356..0.499 rows=407 loops=1)
 Planning Time: 0.454 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4443K bytes avg x 3x(0) workers, 4443K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 1809.517 ms
(18 rows)
Time: 1810.827 ms (00:01.811)

Parallel plan:
explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2);
                                                       QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=758.707..758.710 rows=1 loops=1)
   ->  Gather Motion 6:1  (slice1; segments: 6) (actual time=668.747..758.685 rows=6 loops=1)
         ->  Partial Aggregate (actual time=752.479..752.483 rows=1 loops=1)
               ->  Hash Left Anti Semi (Not-In) Join (actual time=3.010..565.127 rows=833732 loops=1)
                     Hash Cond: (t1.c1 = t2.c1)
                     Extra Text: (seg2)   Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets.
                     ->  Parallel Seq Scan on t1 (actual time=0.368..231.049 rows=833932 loops=1)
                     ->  Hash (actual time=2.148..2.149 rows=1200 loops=1)
                           Buckets: 524288  Batches: 1  Memory Usage: 4139kB
                           ->  Broadcast Motion 3:6  (slice2; segments: 3) (actual time=0.203..1.779 rows=1200 loops=1)
                                 ->  Seq Scan on t2 (actual time=0.361..0.499 rows=407 loops=1)
 Planning Time: 0.470 ms
   (slice0)    Executor memory: 124K bytes.
   (slice1)    Executor memory: 4483K bytes avg x 6x(0) workers, 4483K bytes max (seg0).  Work_mem: 4139K bytes max.
   (slice2)    Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0).
 Memory used:  128000kB
 Optimizer: Postgres query optimizer
 Execution Time: 759.440 ms
(18 rows)
Time: 760.874 ms

[1] Example:
create table t1(c1 int, c2 int) using ao_row distributed by (c1);
create table t2(c1 int, c2 int) using ao_row distributed by (c1);
set enable_parallel = on;
set gp_appendonly_insert_files = 2;
set gp_appendonly_insert_files_tuples_range = 100;
set max_parallel_workers_per_gather = 2;
insert into t1 select i, i from generate_series(1, 5000000) i;
insert into t2 select i+1, i from generate_series(1, 1200) i;
analyze t1;
analyze t2;

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli force-pushed the enable_parallel_oblivious_lasj_hashjoin branch from 629cae3 to 3fd94b0 Compare August 8, 2023 02:21
@avamingli avamingli merged commit a5eca8b into apache:main Aug 9, 2023
@avamingli avamingli deleted the enable_parallel_oblivious_lasj_hashjoin branch August 9, 2023 04:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

The NOT IN Hash Join is not parallel

3 participants