Skip to content

Conversation

@chronitis
Copy link
Contributor

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:

This adds an abstract BranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

tests/operators/test_branch_operator.py

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

Code Quality

  • Passes flake8

@ashb
Copy link
Member

ashb commented May 3, 2019

If you're going to add this the existing BranchingPython operator should use it!

@ashb
Copy link
Member

ashb commented May 3, 2019

An alternative would be for you to subclass the existiing PythonBranchOperator like this in your code base:

class MyBranchOperator(PythonBranchOperator):
    def __int__(**kwargs):
        super().__init__(python_callable=self.choose_branch, **kwargs)

    def choose_branch(self):
        return 'task_a' # etc.

@chronitis
Copy link
Contributor Author

I take the comment that this logic shouldn't be duplicated between this operator and BranchPythonOperator - I'll look at a better way of avoiding duplication.

The reason for not subclassing BranchPythonOperator was that creating a bound reference to the method in __init__ seems to cause issues with templating:

class MyBranch(BranchPythonOperator):
    template_fields = ('myvar', )
    def __init__(self, myvar, **kwargs):
       self.myvar = myvar
       super().__init__(python_callable=self.choose_branch)

    def choose_branch(self):
        # for reasons I'm not totally clear on, but I'm guessing the reference to `self` before `super()`
        # has returned creates a closure, any {{ templates }} in `self.myvar` here don't get evaluated
        return self.myvar

Less importantly, it seems a bit inelegant to need to pass a reference to the method like this.

@ashb
Copy link
Member

ashb commented May 3, 2019

myvar won't be templated until just before execute() is called - when were you looking at the value? In init or in choose_branch?

@ashb
Copy link
Member

ashb commented May 3, 2019

I take the comment that this logic shouldn't be duplicated between this operator and BranchPythonOperator - I'll look at a better way of avoiding duplication.

Yes, that was what I meant.

@chronitis
Copy link
Contributor Author

myvar won't be templated until just before execute() is called - when were you looking at the value? In init or in choose_branch?

In choose_branch; if you inspect the operator via the web interface the rendered template, myvar comes out as expected (templates applied), but code in choose_branch (invoked from BranchPythonOperator.execute()) appears to see the the untemplated version.

@chronitis
Copy link
Contributor Author

chronitis commented May 3, 2019

Here's a possible refactoring; moving the logic part into SkipMixin and building both BranchOperator and BranchPythonOperator off it. It's possible that BranchOperator is unnecessary now and users should just inherit SkipMixin directly in this logic.

(Tests pass except for the Kubernetes case which appears to have timed out).

@codecov-io
Copy link

codecov-io commented May 7, 2019

Codecov Report

Merging #5231 into master will increase coverage by <.01%.
The diff coverage is 95.83%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #5231      +/-   ##
==========================================
+ Coverage   78.67%   78.67%   +<.01%     
==========================================
  Files         470      471       +1     
  Lines       30013    30023      +10     
==========================================
+ Hits        23613    23622       +9     
- Misses       6400     6401       +1
Impacted Files Coverage Δ
airflow/models/skipmixin.py 100% <100%> (ø) ⬆️
airflow/operators/python_operator.py 95.45% <100%> (-0.36%) ⬇️
airflow/operators/branch_operator.py 85.71% <85.71%> (ø)
airflow/models/taskinstance.py 92.42% <0%> (-0.18%) ⬇️
airflow/models/dagbag.py 92.27% <0%> (+0.48%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1755c35...412b978. Read the comment docs.

@chronitis
Copy link
Contributor Author

@ashb does this refactoring and documentation look reasonable?

@potiuk
Copy link
Member

potiuk commented May 8, 2019

@ashb does this refactoring and documentation look reasonable?

Ash is on holidays for the next 3 weeks but I might take over from here. I will take a look later today/tomorrow ok?

@potiuk potiuk self-requested a review May 8, 2019 19:20
@chronitis
Copy link
Contributor Author

@potiuk fine with me. I'm grateful for whoever amongst the committers is willing to take a look.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small naming change, but yes this looks good, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class BranchOperator(BaseOperator, SkipMixin):
class BaseBranchOperator(BaseOperator, SkipMixin):

Just to make it a tiny bit clearer that this is intended to be sub-classed (i.e. not used in a DAG, similar to BaseOperator)

@ashb ashb changed the title [AIRFLOW-4456] Add BranchOperator [AIRFLOW-4456] Add sub-classable BaseBranchOperator Jun 21, 2019
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve doc linking to classes too.

The suggestions as they are will make the line-wrapping messy, so probably tidy that up too :)

@ashb
Copy link
Member

ashb commented Jun 21, 2019

@chronitis If you are able to make the small changes suggested by Monday I can probably get this in to the 1.10.4 release.

Gordon Ball added 3 commits June 24, 2019 11:08
This adds an abstract BranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.
This moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BranchOperator to use this mixin instead of
implementing the logic themselves.
Adds a brief section on BranchOperator to docs/concepts.rst
@chronitis
Copy link
Contributor Author

@ashb I think that should cover all the changes you have requested. Thanks for getting back to this one.

BranchOperator is prefixed Base to reflect that it is designed to be
subclassed, and the documentation and tests updated accordingly.
@ashb
Copy link
Member

ashb commented Jun 24, 2019

(I edited your branch to fix the pylint warning)

@kaxil
Copy link
Member

kaxil commented Jun 24, 2019

Good work @chronitis

@ashb ashb merged commit 3891be4 into apache:master Jun 24, 2019
ashb pushed a commit that referenced this pull request Jun 24, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.

(cherry picked from commit 3891be4)
@ashb
Copy link
Member

ashb commented Jun 24, 2019

@chronitis Can you see if I've made some silly mistake on cherry-picking this back to the release branch -- it's failing on it's Python 2 tests. 3fc191c

Come the morning I'll pull this out of the branch to get a 1.10.4beta1 out but hopefully we can get it back in before we create the first release candidate.

@ashb
Copy link
Member

ashb commented Jun 24, 2019

Oh got it - str (py3) vs basesxtring (py2)

ashb pushed a commit that referenced this pull request Jun 24, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.

(cherry picked from commit 3891be4)
ashb pushed a commit that referenced this pull request Jun 25, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.

(cherry picked from commit 3891be4)
andriisoldatenko pushed a commit to andriisoldatenko/airflow that referenced this pull request Jul 26, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.
wmorris75 pushed a commit to modmed-external/incubator-airflow that referenced this pull request Jul 29, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.
dharamsk pushed a commit to postmates/airflow that referenced this pull request Aug 8, 2019
This adds an abstract BaseBranchOperator, which implementors can use to
create concrete operators for branching workflows fully encapsulated in
a class, rather than needing to pass an external callable (or
self-reference) to BranchPythonOperator. Also adds tests for same.

It also moves the logic of skipping non-selected branches into
airflow.models.SkipMixin:skip_all_except, and modifies
BranchPythonOperator and BaseBranchOperator to use this mixin instead of
implementing the logic themselves.

(cherry picked from commit 3891be4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants