Skip to content

Conversation

@uchenily
Copy link
Contributor

@uchenily uchenily commented Mar 25, 2025

Rationale for this change

Closes #45917

What changes are included in this PR?

Execute JoinResultMaterialize.Flush() in task group to enhance parallelism in downstream processing, improving the performance of hash join.

Are these changes tested?

Yes.

Are there any user-facing changes?

None.

@uchenily uchenily requested a review from westonpace as a code owner March 25, 2025 02:25
@github-actions
Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

See also:

@uchenily uchenily changed the title Hash join improvement GH-45917: [C++][Acero] Join result materialize in parallel Mar 25, 2025
@github-actions
Copy link

⚠️ GitHub issue #45917 has been automatically assigned in GitHub to PR creator.

@uchenily
Copy link
Contributor Author

@westonpace @pitrou @zanmato1984 Please take a look when you have time. Thanks!

@uchenily uchenily force-pushed the hash-join-improvement branch from b6387ae to aeb47d3 Compare March 25, 2025 04:40
@zanmato1984
Copy link
Contributor

Hi @uchenily , thank you for opening the PR.

I would like add some more about the problem this PR is trying to address:
By the time that JoinProbeProcessor::OnFinished is invoked, there will be at most 1 << 15 or 32k pending rows (that is, ought to be but not yet been emitted to downstream nodes) in each JoinResultMaterialize (num_threads of them in total) and we are going to Flush them. Serial execution of these Flush not only slows down the Flush itself, but also disables parallelism for any downstream processing, which in some cases, might be computational intensive (consider an aggregation like select sum(c) from t1 right join t2 on a = b group by d). Thus I think this PR makes sense.

Just wondering if you have encountered any case that the above problem causes real performance issue and how bad it is? And how much does this PR improve it?

@uchenily
Copy link
Contributor Author

uchenily commented Mar 25, 2025

@zanmato1984 I ran a test hashjoin + hash aggr (join type: RIGHT_OUTER, no key match). When each input batch was set to 1<<15, the probe * build (4096 * 512) scenario took only 17.8s (including data generation time), whereas the original serial way took 471.8s. (In this set of comparative tests, the value of kNumRowsPerScanTask was consistently set to 4 * 1024).

It should be noted that during this test above, I modified kNumRowsPerScanTask to 4 * 1024. If the original value 512 * 1024 was used, the performance remained poor, in fact, the test took so long that I couldn't even measure the runtime.

What I mean is that kNumRowsPerScanTask also significantly impacts the test results. However, since I couldn't determine a more reasonable value for this parameter now, I haven't fully understood how this parameter affects the test results, so I will leave it unchanged in this PR.

@zanmato1984
Copy link
Contributor

@zanmato1984 I ran a test hashjoin + hash aggr (join type: RIGHT_OUTER, no key match). When each input batch was set to 1<<15, the probe * build (4096 * 512) scenario took only 17.8s (including data generation time), whereas the original serial way took 471.8s. (In this set of comparative tests, the value of kNumRowsPerScanTask was consistently set to 4 * 1024).

Thank you for the info. May I know the number of threads in your test?

@uchenily
Copy link
Contributor Author

@zanmato1984 I ran the test on a 112-core machine, using the default values for both CPU thread pool and IO thread pool. num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1), so it should be 112 + 8 + 1.

@zanmato1984
Copy link
Contributor

Thank you. After some math I think I can explain the perf boost in your setup.

Your thread count is 120, so there are 120 materialize_ s. You modified the kNumRowsPerScanTask to 4k - let's call each 4k rows a "batch" for short. You have 512 * (1 << 15) unmatched rows, aka 4096 batches in the build side. Because 4096 > thread count so the parallelism of the scan will be 120. Each scan thread accumulates 4096 / 120 = 34 batches into a materialize_ with most of the rows output to the downstream (the agg) in parallel and 32k rows pending to flush in materialize_. Finally when the scan is finished, the serial flushing will need to flush 120 * 32k rows aka 960 batches in sequence and output to downstream (the agg), whereas the parallel flushing (this PR) will use 120 thread, each flushing 32k rows aka 4 batches.

Summarizing the comparison:

  1. In the scan phase, each thread processes 34 batches, this is the same for both serial and parallel flushing.
  2. In the flushing phase, serial execution processes 960 batches but parallel execution processes 4 batches (in each thread).
  3. After roughly solving some equations, it takes about 0.5s to process a batch (mostly by the downstream agg). The serial flushing takes about (/*scan*/34 + /*flush*/960) * 0.5 = 497s, and the parallel flushing takes about (/*scan*/34 + /*flush*/4) * 0.5 = 19s.
  4. The numbers all add up.

In terms of the improvement of this PR for kNumRowsPerScanTask = 512k (the original value), though we don't have numbers, we can infer that due to the less parallelism (512 * (1 << 15) / 512k = 32 threads), the scan phase won't be as fast. And although there are still 120 materialize_ s (the probe parallelism is still 120), only 32 of them will have data (because of the scan parallelism being 32), so the flushing will only boost for 32x using parallel flushing.

But anyway, I think the effectiveness of this PR is independent of kNumRowsPerScanTask and fully justified. I will now proceed with reviewing the code.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Mar 26, 2025
@uchenily uchenily changed the title GH-45917: [C++][Acero] Join result materialize in parallel GH-45917: [C++][Acero] Add flush taskgroup to enable parallelization Mar 26, 2025
Copy link
Contributor

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nice. Only one nit.

@uchenily uchenily force-pushed the hash-join-improvement branch from 7e5a2b0 to a1934df Compare March 26, 2025 06:26
@uchenily uchenily requested a review from zanmato1984 March 26, 2025 06:27
@zanmato1984
Copy link
Contributor

@github-actions crossbow submit -g cpp

Copy link
Contributor

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll merge once the CI is good.

Thank you for your contribution @uchenily !

@github-actions
Copy link

Revision: a1934df

Submitted crossbow builds: ursacomputing/crossbow @ actions-750fc6ab42

Task Status
example-cpp-minimal-build-static GitHub Actions
example-cpp-minimal-build-static-system-dependency GitHub Actions
example-cpp-tutorial GitHub Actions
test-alpine-linux-cpp GitHub Actions
test-build-cpp-fuzz GitHub Actions
test-conda-cpp GitHub Actions
test-conda-cpp-meson GitHub Actions
test-conda-cpp-valgrind GitHub Actions
test-cuda-cpp-ubuntu-22.04-cuda-11.7.1 GitHub Actions
test-debian-12-cpp-amd64 GitHub Actions
test-debian-12-cpp-i386 GitHub Actions
test-fedora-39-cpp GitHub Actions
test-ubuntu-22.04-cpp GitHub Actions
test-ubuntu-22.04-cpp-20 GitHub Actions
test-ubuntu-22.04-cpp-bundled GitHub Actions
test-ubuntu-22.04-cpp-emscripten GitHub Actions
test-ubuntu-22.04-cpp-no-threading GitHub Actions
test-ubuntu-24.04-cpp GitHub Actions
test-ubuntu-24.04-cpp-bundled-offline GitHub Actions
test-ubuntu-24.04-cpp-gcc-13-bundled GitHub Actions
test-ubuntu-24.04-cpp-gcc-14 GitHub Actions
test-ubuntu-24.04-cpp-minimal-with-formats GitHub Actions
test-ubuntu-24.04-cpp-thread-sanitizer GitHub Actions

@zanmato1984
Copy link
Contributor

@ursabot please benchmark

@ursabot
Copy link

ursabot commented Mar 26, 2025

Benchmark runs are scheduled for commit a1934df. Watch https://buildkite.com/apache-arrow and https://conbench.ursa.dev for updates. A comment will be posted here when the runs are complete.

@conbench-apache-arrow
Copy link

Thanks for your patience. Conbench analyzed the 0 benchmarking runs that have been run so far on PR commit a1934df.

None of the specified runs were found on the Conbench server.

The full Conbench report has more details.

@zanmato1984
Copy link
Contributor

Benchmark failures are unrelated. Seems it has been broken for a while. cc @raulcd may know more about this.

I'm merging now.

@zanmato1984 zanmato1984 merged commit c753740 into apache:main Mar 26, 2025
38 of 39 checks passed
@conbench-apache-arrow
Copy link

After merging your PR, Conbench analyzed the 0 benchmarking runs that have been run so far on merge-commit c753740.

None of the specified runs were found on the Conbench server.

The full Conbench report has more details.

@raulcd
Copy link
Member

raulcd commented Mar 26, 2025

Thanks, I've opened a blocker:

It seems Arrow fails to build on the buildkite runners

zanmato1984 pushed a commit to zanmato1984/arrow that referenced this pull request Apr 15, 2025
…ation (apache#45918)

### Rationale for this change

Closes apache#45917

### What changes are included in this PR?

Execute JoinResultMaterialize.Flush() in task group to enhance parallelism in downstream processing, improving the performance of hash join.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

None.

* GitHub Issue: apache#45917

Authored-by: uchenily <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++][Acero] Join result materialize in parallel

4 participants