-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Optimize subclasses of DummyOperator for Scheduling #12745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Custom operators inheriting from DummyOperator will now instead of going to a scheduled state will go set straight to success if they don't have callbacks set. closes apache#11393
| @classmethod | ||
| def _is_inherited_from_dummy_operator(cls, op: BaseOperator) -> bool: | ||
| """Used to determine if an Operator is inherited from DummyOperator""" | ||
| if op.task_type == "DummyOperator" or isinstance(op, DummyOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not add check to see if the execute method is empty or not like we discussed @ashb. My thinking for that was as it is a DummyOperator where we list it should do nothing, let's just keep it simple.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just simply do isinstance(op, DummyOperator), why check the task_type? Anyway, we can just do:
return op.task_type == "DummyOperator" or isinstance(op, DummyOperator)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One for real dag, one for Serialzed dag I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about instead of this approach,
Add a method/propery on to BaseOperator:
class BaseOperator:
@property
def inherits_from_dummy_operator(self):
getattr(self, '_is_dummy', False)
And then
class DummyOperator(BaseOperator):
inherits_from_dummy_operator = True
That way this method isn't needed, and at runtime in both cases (real or serialized) we can just look at `op.inherits_from_dummy_operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the approach I suggested (if it works), you can just get rid of this method :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give that a try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works like a charm, thanks.
Updated in 0b47a1b
|
|
||
| dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This DAG is used by test_should_mark_dummy_task_as_success test in tests/jobs/test_scheduler_job.py
airflow/models/dagrun.py
Outdated
| for ti in schedulable_tis | ||
| if ( | ||
| ti.task.task_type == "DummyOperator" | ||
| (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) | |
| (getattr(ti.task, "_is_dummy", False) |
Should this be enough? I think DummyOperator will also fall in this category
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for backwards-compatibility so the current Serialized DAGs which don't have _is_dummy field still continue to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not needed after 67b5daf 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think right now this could be used in Scheduler (on serialized dags) and Backfill (on real DAGs) so that commit alone won't help.
| setattr(op, field, None) | ||
|
|
||
| # Used to determine if an Operator is inherited from DummyOperator | ||
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) | |
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False))) |
To load existing s10n blobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 67b5daf
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment, but pre-emptively approve.
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Custom operators inheriting from DummyOperator will now instead
of going to a scheduled state will go set straight to success
if they don't have callbacks set.
closes #11393
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.