-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug
Milestone
Description
Apache Airflow version
2.3.0 (latest released)
What happened
I've found a DAG that will crash the scheduler:
@task
def hello():
return "hello"
hello.expand()
[2022-05-03 03:41:23,779] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 522, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 661, in task_instance_scheduling_decisions
session=session,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis
expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task
operator.mul, self._resolve_map_lengths(run_id, session=session).values()
TypeError: reduce() of empty sequence with no initial value
What you think should happen instead
A user DAG shouldn't crash the scheduler. This specific case could likely be an ImportError at parse time, but it makes me think we might be missing some exception handling?
How to reproduce
No response
Operating System
Debian
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug