-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add new datetime branch operator #11964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
277a146 to
144e6b8
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
DateTimeSensor has a much simpler interface. Why do we need so many parameters here? |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
I don't think you can actually compare exact specific datetime as you can't know when the task will be executed. For example branching based on specific minute will probably wont work as expected. I think the concept should be more to get acceptable range : lower & upper. If current time is between the range then continue true branch otherwise continue false branch. |
08bbfaa to
911735f
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
911735f to
044cf1f
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
044cf1f to
14793c2
Compare
|
@eladkal I agree, any unit smaller than hour is probably not something you'd be able to match reliably anyways 😓. I like your idea, and I've also allowed for either the upper bound or the lower bound to be set as This also reduces the number of arguments, which was getting unnecessarily out of control as @mik-laj suggested. Branch has been rebased as @kaxil asked. I'm was having a bit of trouble getting the docs build to pass: now docs build passes locally, so I'm expecting it to pass with the latest commit. Thanks all for reviews and input 💪 |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out 👍 Latest commit should replicate the work done for TimeSensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add unit tests ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests added 🙌
0fe054c to
f6ff87e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both target_* arguments are optional to support unilateral comparisons, i.e. comparing the current time to a lower target but without any upper bound and vice-versa. We do need at least one of the two to be defined otherwise there is no target to compare the current date to, which is why that is checked below in __init__.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above ☝️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears in every test with almost indentical content -- could you create a helper function to reduce this boilerplate please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Moved boilerplate code over to setUp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than mocking the datetime like this, please use freezegun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I've used freezegun instead.
|
Not sure why MySQL build is failing, all tests appear to be passing according to logs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the suffix of _operator in the file name should be removed (to comply with AIP 21)
see #11178
e184167 to
78ec08c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also remove the _operator suffix from the test file
airflow/operators/datetime_branch.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondring about the use cases of datetime in general.
I would assume that it's more likely that user would like to branch based on times rather than datetime.
Do you think we will need a different TimeBranchOperator or the two use cases can be combined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Branching of times should be possible by setting the target values to something like:
target_lower=datetime.datetime.combine(
datetime.datetime.now(), datetime.time(hour=target_hour, minute=target_minute, second=target_second)
)
But this is not as straightforward as passing a single datetime.time.
I think adding a TimeBranchOperator may be a bit redundant, instead, we could support targets of type Optional[Union[datetime.datetime, datetime.time]], and when encountering a datetime.time we combine it with the current date and proceed as it is right now:
def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
target_upper = datetime.datetime.combine(now, self.target_upper) if isinstance(self.target_upper, datetime.time) else self.target_upper
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think the Optional[Union[datetime.datetime, datetime.time]] approach is better as it simplify the usage for the users but maybe worth asking for more opinions on this one.
note that there are edge cases for example lower is 23:00 and upper is 01:00 (the next day) so the duration of success is total of 2 hours yet it spread over two dates.
Also it will be good to add documentation about this operator. you can use previous PRs to see examples #11472
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figured out it may be easier to hear other opinions with a concrete example, so I implemented the idea mostly as described. The edge case you described is handled by pushing the smaller time to the next day. Should the same thing be done when both times are equal? Or should an exception be raised instead?
Gave it a shot at adding documentation, do let me know if it needs to be extended or rewritten.
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a description of another operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that, copy and paste blunder. Thanks for the review!
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
eladkal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
you will need to rebase to fix the tests/docs due to #13201
airflow/operators/datetime_branch.py
Outdated
| True branch will be returned when `datetime.datetime.now()` falls below | ||
| `target_upper` and above `target_lower`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the behaviour when backfilling or catching-up ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding something like:
airflow/airflow/operators/weekday.py
Lines 86 to 89 in e363ac1
| if self.use_task_execution_day: | |
| now = context["execution_date"] | |
| else: | |
| now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) |
cc @eladkal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! Behavior when backfilling would be to still use system date, which is probably not what a user would expect. I added a use_task_execution_date argument very similar to the one you linked, with some minor adaptation since we are talking about dates instead of days of the week.
airflow/operators/datetime_branch.py
Outdated
| self.follow_task_ids_if_false = follow_task_ids_if_false | ||
|
|
||
| def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]: | ||
| now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented your suggestion (see the other comment).
|
@tomasfarias can you please address issues/questions raised by kaxil? |
Co-authored-by: Kaxil Naik <[email protected]>
74b4d9f to
8253b4a
Compare
|
@eladkal Been a bit busy with work the last few weeks, but all comments should be addressed! Thanks for pinging me 👍 |
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
Awesome work, congrats on your first merged pull request! |
|
while commenting on #11931 i notice that we might have two issues here:
Since the operator isn't released yet we can handle this easily without deprecation notice. I can handle this if @tomasfarias has no time. |
|
@eladkal Things are much quieter in my schedule now so I can push any changes to names if they are deemed appropriate. I personally agree 100% with 1. as it plays nice with any IDE autocomplete where you can just get all the For 2. though I see we have both |
They are deprecated :) |
|
Now that you mention that, we should also change the deprecated imports, e.g.: To: I've pushed the changes to my fork, see this commit: tomasfarias@5fc2862. I can create a PR as soon as the changes are confirmed as needed 👍 Great feedback! |
|
Good point @eladkal -- agree to your suggestions. And yes since they are not released we can rename them without deprecations |
|
I opened a PR with the changes suggested, linking here for reference: #14720 |
closes: #11929
This PR includes a new datetime branching operator: the current date and time, as given by
datetime.datetime.nowis compared against targetdatetimeattributes, likeyearorhour, to decide which task id branch to take.No tests were written yet as this is intended as a POC to allow us to review the implementation first.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.