Skip to content

Conversation

@avamingli
Copy link
Contributor

@avamingli avamingli commented Jul 3, 2025

This commit implements improvements to the handling of UNION operations in CBDB, specifically addressing challenges related to Parallel Append and Motion nodes within subqueries. We have disabled Parallel Append for UNION operations to prevent incorrect results caused by competition among workers for subnodes. This change mitigates the risk of premature task completion, which previously led to data loss in scenarios involving Motion Senders.

To further enhance parallel processing capabilities, we have introduced a Parallel-oblivious Append approach. This allows multiple workers to operate independently without sharing state, eliminating the coordination issues associated with Parallel-aware Append strategies.

By implementing these changes, we improve the reliability and correctness of UNION operations while maintaining overall system performance. This positions CBDB to effectively support parallel processing in a safer manner.

select distinct a from t_distinct_0 union select distinct b from t_distinct_0;
                            QUERY PLAN
----------------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: t_distinct_0.a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: t_distinct_0.a
               Hash Module: 3
               ->  Append
                     ->  GroupAggregate
                           Group Key: t_distinct_0.a
                           ->  Sort
                                 Sort Key: t_distinct_0.a
                                 ->  Redistribute Motion 6:6  (slice3;
segments: 6)
                                       Hash Key: t_distinct_0.a
                                       Hash Module: 3
                                       ->  Streaming HashAggregate
                                             Group Key: t_distinct_0.a
                                             ->  Parallel Seq Scan on
t_distinct_0
                     ->  GroupAggregate
                           Group Key: t_distinct_0_1.b
                           ->  Sort
                                 Sort Key: t_distinct_0_1.b
                                 ->  Redistribute Motion 6:6  (slice4;
segments: 6)
                                       Hash Key: t_distinct_0_1.b
                                       Hash Module: 3
                                       ->  Streaming HashAggregate
                                             Group Key: t_distinct_0_1.b
                                             ->  Parallel Seq Scan on
t_distinct_0 t_distinct_0_1

performance

see case[0] below

no-parallel (3031.346 ms)

explain(costs off, analyze) select b, count(a) from ao1 group by b  union select b, sum(a) from ao2 group by b;
                                                                        QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3) (actual time=3024.080..3032.081 rows=19999 loops=1)
   ->  HashAggregate (actual time=3024.080..3024.080 rows=6722 loops=1)
         Group Key: ao1.b, (count(ao1.a))
         Extra Text: (seg0)   hash table(s): 1; chain length 4.0 avg, 23 max; using 6722 of 8192 buckets; total 0 expansions.
         ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=3008.080..3016.080 rows=6723 loops=1)
               Hash Key: ao1.b, (count(ao1.a))
               ->  Append (actual time=3004.080..3016.080 rows=6768 loops=1)
                     ->  Finalize HashAggregate (actual time=3004.080..3008.080 rows=3384 loops=1)
                           Group Key: ao1.b
                           Extra Text: (seg0)   hash table(s): 1; chain length 2.3 avg, 6 max; using 3368 of 8192 buckets; total 1 expansions.
                           ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=2880.077..2996.080 rows=3384 loops=1)
                                 Hash Key: ao1.b
                                 ->  Partial HashAggregate (actual time=3000.080..3004.080 rows=3385 loops=1)
                                       Group Key: ao1.b
                                       Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                       ->  Seq Scan on ao1 (actual time=0.000..1292.034 rows=3466240 loops=1)
                     ->  Finalize HashAggregate (actual time=8.000..8.000 rows=3384 loops=1)
                           Group Key: ao2.b
                           Extra Text: (seg0)   hash table(s): 1; chain length 4.4 avg, 16 max; using 3368 of 4096 buckets; total 0 expansions.
                           ->  Redistribute Motion 3:3  (slice4; segments: 3) (actual time=0.000..4.000 rows=3384 loops=1)
                                 Hash Key: ao2.b
                                 ->  Partial HashAggregate (actual time=3004.080..3004.080 rows=3385 loops=1)
                                       Group Key: ao2.b
                                       Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                       ->  Seq Scan on ao2 (actual time=0.000..1340.036 rows=3466240 loops=1)
 Planning Time: 1.192 ms
   (slice0)    Executor memory: 1267K bytes.
 * (slice1)    Executor memory: 434K bytes avg x 3x(0) workers, 436K bytes max (seg0).  Work_mem: 721K bytes max, 721K bytes wanted.
 * (slice2)    Executor memory: 598K bytes avg x 3x(0) workers, 666K bytes max (seg0).  Work_mem: 721K bytes max, 721K bytes wanted.
 * (slice3)    Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
 * (slice4)    Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
 Memory used:  128000kB
 Memory wanted:  5260kB
 Optimizer: Postgres query optimizer
 Execution Time: 3031.346 ms
(40 rows)

4-parallel UNION (1226.660 ms)

explain(costs off, analyze) select b, count(a) from ao1 group by b  union select b, sum(a) from ao2 group by b;
                                                                        QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 12:1  (slice1; segments: 12) (actual time=1180.031..1188.031 rows=19999 loops=1)
   ->  HashAggregate (actual time=1168.031..1168.031 rows=1670 loops=1)
         Group Key: ao1.b, (count(ao1.a))
         Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 3 max; using 1647 of 8192 buckets; total 0 expansions.
         ->  Redistribute Motion 12:12  (slice2; segments: 12) (actual time=1164.031..1164.031 rows=1670 loops=1)
               Hash Key: ao1.b, (count(ao1.a))
               Hash Module: 3
               ->  Append (actual time=1148.030..1164.031 rows=1620 loops=1)
                     ->  Finalize HashAggregate (actual time=1148.030..1148.030 rows=810 loops=1)
                           Group Key: ao1.b
                           Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
                           ->  Redistribute Motion 12:12  (slice3; segments: 12) (actual time=932.025..1140.030 rows=3240 loops=1)
                                 Hash Key: ao1.b
                                 Hash Module: 3
                                 ->  Partial HashAggregate (actual time=1140.030..1140.030 rows=3385 loops=1)
                                       Group Key: ao1.b
                                       Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                       ->  Parallel Seq Scan on ao1 (actual time=4.000..484.013 rows=900000 loops=1)
                     ->  Finalize HashAggregate (actual time=8.000..8.000 rows=810 loops=1)
                           Group Key: ao2.b
                           Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
                           ->  Redistribute Motion 12:12  (slice4; segments: 12) (actual time=0.000..8.000 rows=3240 loops=1)
                                 Hash Key: ao2.b
                                 Hash Module: 3
                                 ->  Partial HashAggregate (actual time=1064.028..1068.028 rows=3385 loops=1)
                                       Group Key: ao2.b
                                       Extra Text: (seg0)   hash table(s): 1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
                                       ->  Parallel Seq Scan on ao2 (actual time=0.000..524.014 rows=848832 loops=1)
 Planning Time: 1.097 ms
   (slice0)    Executor memory: 1273K bytes.
 * (slice1)    Executor memory: 315K bytes avg x 12x(0) workers, 317K bytes max (seg0).  Work_mem: 337K bytes max, 337K bytes wanted.
 * (slice2)    Executor memory: 371K bytes avg x 12x(0) workers, 374K bytes max (seg0).  Work_mem: 241K bytes max, 241K bytes wanted.
 * (slice3)    Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
 * (slice4)    Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes max (seg1).  Work_mem: 913K bytes max, 913K bytes wanted.
 Memory used:  128000kB
 Memory wanted:  5260kB
 Optimizer: Postgres query optimizer
 Execution Time: 1226.660 ms
(43 rows)

case[0]

create table ao1(a int, b int) using ao_column;
create table ao2(a int, b int) using ao_column;
insert into ao1 select i, i+1 from generate_series(1, 10000) i;

insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
analyze ao1;
insert into ao2 select * from ao1;
analyze ao2;

Authored-by: Zhang Mingli [email protected]

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

CI Skip Instructions


@avamingli avamingli added type: Performance cloudberry runs slow on some particular query planner labels Jul 3, 2025
@avamingli
Copy link
Contributor Author

Got some crash and failures in previous commit.

The RCA is:
In the make_union_unique function, the upper UPPERREL_SETOP relation is incorrectly fetched using a NULL bitmap.
This results in a new relation where consider_parallel is set to false, despite having created the UPPERREL_SETOP with the appropriate relids.
This inconsistency ultimately leads to assertion failures in the add_partial_path function and other failures.

This should be another issue which is exposed by this PR, keep them as two commits.

@avamingli
Copy link
Contributor Author

Got some crash and failures in previous commit.

The RCA is: In the make_union_unique function, the upper UPPERREL_SETOP relation is incorrectly fetched using a NULL bitmap. This results in a new relation where consider_parallel is set to false, despite having created the UPPERREL_SETOP with the appropriate relids. This inconsistency ultimately leads to assertion failures in the add_partial_path function and other failures.

This should be another issue which is exposed by this PR, keep them as two commits.

Fixed in 32278d2

Copy link
Contributor

@my-ship-it my-ship-it left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

avamingli added 2 commits July 8, 2025 10:07
This commit implements improvements to the handling of UNION operations
in CBDB, specifically addressing challenges related to Parallel Append
and Motion nodes within subqueries. We have disabled Parallel Append for
UNION operations to prevent incorrect results caused by competition
among workers for subnodes. This change mitigates the risk of premature
task completion, which previously led to data loss in scenarios
involving Motion Senders.

To further enhance parallel processing capabilities, we have introduced
a Parallel-oblivious Append approach. This allows multiple workers to
operate independently without sharing state, eliminating the
coordination issues associated with Parallel-aware Append strategies.

By implementing these changes, we improve the reliability and
correctness of UNION operations while maintaining overall system
performance. This positions CBDB to effectively support parallel
processing in a safer manner.

select distinct a from t_distinct_0 union select distinct b from
t_distinct_0;
                            QUERY PLAN
----------------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: t_distinct_0.a
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: t_distinct_0.a
               Hash Module: 3
               ->  Append
                     ->  GroupAggregate
                           Group Key: t_distinct_0.a
                           ->  Sort
                                 Sort Key: t_distinct_0.a
                                 ->  Redistribute Motion 6:6  (slice3;
segments: 6)
                                       Hash Key: t_distinct_0.a
                                       Hash Module: 3
                                       ->  Streaming HashAggregate
                                             Group Key: t_distinct_0.a
                                             ->  Parallel Seq Scan on
t_distinct_0
                     ->  GroupAggregate
                           Group Key: t_distinct_0_1.b
                           ->  Sort
                                 Sort Key: t_distinct_0_1.b
                                 ->  Redistribute Motion 6:6  (slice4;
segments: 6)
                                       Hash Key: t_distinct_0_1.b
                                       Hash Module: 3
                                       ->  Streaming HashAggregate
                                             Group Key: t_distinct_0_1.b
                                             ->  Parallel Seq Scan on
t_distinct_0 t_distinct_0_1

Authored-by: Zhang Mingli [email protected]
Corrected the fetching of the upper UPPERREL_SETOP relation to avoid
NULL bitmaps, which caused consider_parallel to be false.
This inconsistency led to assertion failures in add_partial_path.
Ensured the correct relation IDs are set before entering the function
make_union_unique.

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli merged commit effb79a into apache:main Jul 8, 2025
26 checks passed
@avamingli avamingli deleted the dev0 branch July 8, 2025 02:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

planner type: Performance cloudberry runs slow on some particular query

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants