-
Notifications
You must be signed in to change notification settings - Fork 16.3k
[AIRFLOW-4456] Add sub-classable BaseBranchOperator #5231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
If you're going to add this the existing BranchingPython operator should use it! |
|
An alternative would be for you to subclass the existiing PythonBranchOperator like this in your code base: class MyBranchOperator(PythonBranchOperator):
def __int__(**kwargs):
super().__init__(python_callable=self.choose_branch, **kwargs)
def choose_branch(self):
return 'task_a' # etc. |
|
I take the comment that this logic shouldn't be duplicated between this operator and The reason for not subclassing class MyBranch(BranchPythonOperator):
template_fields = ('myvar', )
def __init__(self, myvar, **kwargs):
self.myvar = myvar
super().__init__(python_callable=self.choose_branch)
def choose_branch(self):
# for reasons I'm not totally clear on, but I'm guessing the reference to `self` before `super()`
# has returned creates a closure, any {{ templates }} in `self.myvar` here don't get evaluated
return self.myvarLess importantly, it seems a bit inelegant to need to pass a reference to the method like this. |
|
myvar won't be templated until just before |
Yes, that was what I meant. |
In |
|
Here's a possible refactoring; moving the logic part into (Tests pass except for the Kubernetes case which appears to have timed out). |
Codecov Report
@@ Coverage Diff @@
## master #5231 +/- ##
==========================================
+ Coverage 78.67% 78.67% +<.01%
==========================================
Files 470 471 +1
Lines 30013 30023 +10
==========================================
+ Hits 23613 23622 +9
- Misses 6400 6401 +1
Continue to review full report at Codecov.
|
|
@ashb does this refactoring and documentation look reasonable? |
Ash is on holidays for the next 3 weeks but I might take over from here. I will take a look later today/tomorrow ok? |
|
@potiuk fine with me. I'm grateful for whoever amongst the committers is willing to take a look. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small naming change, but yes this looks good, thanks!
airflow/operators/branch_operator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class BranchOperator(BaseOperator, SkipMixin): | |
| class BaseBranchOperator(BaseOperator, SkipMixin): |
Just to make it a tiny bit clearer that this is intended to be sub-classed (i.e. not used in a DAG, similar to BaseOperator)
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve doc linking to classes too.
The suggestions as they are will make the line-wrapping messy, so probably tidy that up too :)
|
@chronitis If you are able to make the small changes suggested by Monday I can probably get this in to the 1.10.4 release. |
This adds an abstract BranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same.
This moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BranchOperator to use this mixin instead of implementing the logic themselves.
Adds a brief section on BranchOperator to docs/concepts.rst
|
@ashb I think that should cover all the changes you have requested. Thanks for getting back to this one. |
BranchOperator is prefixed Base to reflect that it is designed to be subclassed, and the documentation and tests updated accordingly.
|
(I edited your branch to fix the pylint warning) |
|
Good work @chronitis |
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves. (cherry picked from commit 3891be4)
|
@chronitis Can you see if I've made some silly mistake on cherry-picking this back to the release branch -- it's failing on it's Python 2 tests. 3fc191c Come the morning I'll pull this out of the branch to get a 1.10.4beta1 out but hopefully we can get it back in before we create the first release candidate. |
|
Oh got it - str (py3) vs basesxtring (py2) |
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves. (cherry picked from commit 3891be4)
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves. (cherry picked from commit 3891be4)
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves.
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves.
This adds an abstract BaseBranchOperator, which implementors can use to create concrete operators for branching workflows fully encapsulated in a class, rather than needing to pass an external callable (or self-reference) to BranchPythonOperator. Also adds tests for same. It also moves the logic of skipping non-selected branches into airflow.models.SkipMixin:skip_all_except, and modifies BranchPythonOperator and BaseBranchOperator to use this mixin instead of implementing the logic themselves. (cherry picked from commit 3891be4)
Jira
Description
This adds an abstract BranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.
Tests
tests/operators/test_branch_operator.pyCommits
Documentation
Code Quality
flake8