-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add reset_dag_run option on dagrun_operator to clear existing dag run #11484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
airflow/operators/dagrun_operator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self.log.info(f"Clearing {self.trigger_dag_id} on {self.execution_date}") | |
| self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
airflow/operators/dagrun_operator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| raise DagNotFound("Dag id {} not found in DagModel".format(self.trigger_dag_id)) | |
| raise DagNotFound(f"Dag id {self.trigger_dag_id} not found in DagModel")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
airflow/operators/dagrun_operator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any particular reason to introduce this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simply import conf and do:
store_serialized_dags=conf.getboolean('core', 'store_serialized_dags')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw you can actually use settings.STORE_SERIALIZED_DAGS too. This setting (and conf) will however be removed in #11335
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to settings.STORE_SERIALIZED_DAGS for now. When #11335 is finalized, this line also needs to be updated.
airflow/operators/dagrun_operator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add note here that this may be useful for backfill and rerunning tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added more description.
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
Hey @kukigai . Can you please rebase this one to latest master. We fixed (hopefully) a problem with queues of jobs for GitHub actions and I think when you rebase, it shoudl run much faster (more info on devlist shortly). |
…rget dag run if exists.
|
@potiuk rebased it. |
ba6e988 to
c1bf71f
Compare
dagrun operator throws DagRunAlreadyExists if dag has already run on the same execution date.
However, user might need to backfill with reset_dagruns option.
When user submit backfill with reset_dagruns and want to clear previous dag run, dagrun_operator should clear that dag run.
trigger_dag.py (line76) change is required since "dag_run" can be list and if that is the case, "AttributeError: 'list' object has no attribute 'run_id'" is raised instead of DagRunAlreadyExists.