[Python] Implement combiner deferred side inputs#35601
[Python] Implement combiner deferred side inputs#35601damccorm merged 26 commits intoapache:masterfrom
Conversation
|
Assigning reviewers: R: @claudevdm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
I could use some help debugging the failing unit test. It passes locally for me but seems to fail on the GH Runner for some reason due to a grpc timeout: |
| # If the CombineFn has deferred side inputs, the python SDK | ||
| # doesn't implement it. | ||
| # Use a ParDo-based CombinePerKey instead. | ||
| from apache_beam.runners.direct.helper_transforms import \ |
There was a problem hiding this comment.
Is this meant to only apply to pipelines running on direct runner?
It seems strange to import transforms from direct runner into transforms.core.
There was a problem hiding this comment.
No this should apply to all runners.
I've moved it into apache_beam.transforms.combiners. Let me know if you think it should go somewhere else
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #35601 +/- ##
============================================
+ Coverage 56.56% 56.58% +0.02%
Complexity 3376 3376
============================================
Files 1205 1204 -1
Lines 183574 183701 +127
Branches 3458 3458
============================================
+ Hits 103845 103955 +110
- Misses 76410 76427 +17
Partials 3319 3319
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| from apache_beam.transforms.combiners import \ | ||
| LiftedCombinePerKey | ||
| combine_fn, *args = args | ||
| return LiftedCombinePerKey(combine_fn, args, kwargs) |
There was a problem hiding this comment.
Maybe we should log a warning here that runner implemented CBK has been replaced due to side inputs and may affect performance? Users might not be aware this is happening.
There was a problem hiding this comment.
I'm a bit split. I agree that users will probably not realize this subtle impact from using deferred side inputs with their combiner, but I think ideally there'd also be a way for a user to understand the risk and dismiss the warning.
I'm okay with whichever though, let me know if you still think we should have the warning. I suspect that in most cases even knowing that there may be a reduction in performance that it'd be difficult to actually observe (but maybe I'm underestimating the operations.py combine optimizations
There was a problem hiding this comment.
I'd probably vote to leave it as is. We could warn and have a kwarg to silence the warning (or something similar), but I tend to agree that this is a rare case; while the perf implication is non-obvious, adding side inputs basically always comes with some perf cost
|
R: @damccorm |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
| return accumulator[0] | ||
|
|
||
|
|
||
| class LiftedCombinePerKey(core.PTransform): |
There was a problem hiding this comment.
Should this be part of __all__ for imports?
There was a problem hiding this comment.
I don't think so. I don't think we want to advertise this since I dont think there are cases where a user should use this CPK specifically over the general CPK
| from apache_beam.transforms.combiners import \ | ||
| LiftedCombinePerKey | ||
| combine_fn, *args = args | ||
| return LiftedCombinePerKey(combine_fn, args, kwargs) |
There was a problem hiding this comment.
I'd probably vote to leave it as is. We could warn and have a kwarg to silence the warning (or something similar), but I tend to agree that this is a rare case; while the perf implication is non-obvious, adding side inputs basically always comes with some perf cost
Co-authored-by: Danny McCormick <[email protected]>
damccorm
left a comment
There was a problem hiding this comment.
LGTM, I'll merge once checks complete - thanks!
#19851
Implement deferred side inputs for combiners.
Side inputs aren't implemented at the python sdk worker level (i.e.
operations.pydoes not support deferred side inputs for the combiner operations), so I've just special-cased combiners that have deferred side inputs so they're translated into ParDo version of the combiner. This prevents lifting of combiners but I think that's probably fine as it doesn't seem like this use case was all that used anyways (though Schrodinger use cases keep needing it). If it turns out combiners with deferred side inputs get used a lot, we can implement defererd side inputs at theoperations.pylevel later.I tested the pipeline noted in the motivating issue and can confirm it works now
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.