Skip to content

Dynamic Task Crashes scheduler - Non Empty Return #23642

@bhavaniravi

Description

@bhavaniravi

Apache Airflow version

2.3.0 (latest released)

What happened

I have a dag that looks like this.
When I uncomment py_job(Dynamically mapped PythonOperator) it works well with pull_messages (Taskflow API).
When I try to do the same with DatabricksRunNowOperator it crashes the scheduler with error

Related issues #23486

Sample DAG

import json
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example'],
)
def tutorial_taskflow_api_etl():
    def random(*args, **kwargs):
        print ("==== kwargs inside random ====", args, kwargs)
        print ("I'm random")
        return 49

    @task
    def pull_messages():
        return [["hi"], ["hello"]]

    op = DatabricksRunNowOperator.partial(
        task_id = "new_job",
        job_id=42,
        notebook_params={"dry-run": "true"},
        python_params=["douglas adams", "42"],
        spark_submit_params=["--class", "org.apache.spark.examples.SparkPi"]
    ).expand(jar_params=pull_messages())

    # py_job = PythonOperator.partial(
    #     task_id = 'py_job',
    #     python_callable=random
    # ).expand(op_args= pull_messages())


tutorial_etl_dag = tutorial_taskflow_api_etl()

Error

[2022-05-11 11:46:30 +0000] [40] [INFO] Worker exiting (pid: 40)
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py", line 29, in run_before
    fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 522, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 658, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task
    operator.mul, self._resolve_map_lengths(run_id, session=session).values()
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 595, in _resolve_map_lengths
    raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
RuntimeError: Failed to populate all mapping metadata; missing: 'jar_params'
[2022-05-11 11:46:30 +0000] [31] [INFO] Shutting down: Master

What you think should happen instead

No response

How to reproduce

No response

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-databricks

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions