Skip to content

Empty expand() crashes the scheduler #23435

@jedcunningham

Description

@jedcunningham

Apache Airflow version

2.3.0 (latest released)

What happened

I've found a DAG that will crash the scheduler:

@task
    def hello():
        return "hello"

    hello.expand()
[2022-05-03 03:41:23,779] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop                 
Traceback (most recent call last):                                                                                                 
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                     
    self._run_scheduler_loop()                                                                                                     
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop          
    num_queued_tis = self._do_scheduling(session)                                                                                  
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling               
    callback_to_run = self._schedule_dag_run(dag_run, session)                                                                     
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run           
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)                              
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper                            
    return func(*args, **kwargs)                                                                                                   
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 522, in update_state                      
    info = self.task_instance_scheduling_decisions(session)                                                                        
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper                            
    return func(*args, **kwargs)                                                                                                   
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 661, in task_instance_scheduling_decisions
    session=session,                                                                                                               
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis                    
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)                                            
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task        
    operator.mul, self._resolve_map_lengths(run_id, session=session).values()                                                      
TypeError: reduce() of empty sequence with no initial value                                                                        

What you think should happen instead

A user DAG shouldn't crash the scheduler. This specific case could likely be an ImportError at parse time, but it makes me think we might be missing some exception handling?

How to reproduce

No response

Operating System

Debian

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions