-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Now that we have explicit Dag Run types, we should not need DagRun.external_trigger too.
Example, the following code can be changed
| def choose_branch(self, context: Context) -> str | Iterable[str]: | |
| # If the DAG Run is externally triggered, then return without | |
| # skipping downstream tasks | |
| dag_run: DagRun = context["dag_run"] # type: ignore[assignment] | |
| if dag_run.external_trigger: | |
| self.log.info("Externally triggered DAG_Run: allowing execution to proceed.") | |
| return list(context["task"].get_direct_relative_ids(upstream=False)) |
to
def choose_branch(self, context: Context) -> str | Iterable[str]:
# If the DAG Run is manually triggered, then return without
# skipping downstream tasks
dag_run: DagRun = context["dag_run"] # type: ignore[assignment]
if dag_run.run_type == DagRunType.MANUAL:
self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
return list(context["task"].get_direct_relative_ids(upstream=False))We should replace all usages of DagRun.external_trigger and remove it from DB too.
Reactions are currently unavailable