-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug
Milestone
Description
Apache Airflow version
2.3.0 (latest released)
What happened
I have a dag that looks like this.
When I uncomment py_job(Dynamically mapped PythonOperator) it works well with pull_messages (Taskflow API).
When I try to do the same with DatabricksRunNowOperator it crashes the scheduler with error
Related issues #23486
Sample DAG
import json
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def tutorial_taskflow_api_etl():
def random(*args, **kwargs):
print ("==== kwargs inside random ====", args, kwargs)
print ("I'm random")
return 49
@task
def pull_messages():
return [["hi"], ["hello"]]
op = DatabricksRunNowOperator.partial(
task_id = "new_job",
job_id=42,
notebook_params={"dry-run": "true"},
python_params=["douglas adams", "42"],
spark_submit_params=["--class", "org.apache.spark.examples.SparkPi"]
).expand(jar_params=pull_messages())
# py_job = PythonOperator.partial(
# task_id = 'py_job',
# python_callable=random
# ).expand(op_args= pull_messages())
tutorial_etl_dag = tutorial_taskflow_api_etl()
Error
[2022-05-11 11:46:30 +0000] [40] [INFO] Worker exiting (pid: 40)
return f(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
_run_scheduler_job(args=args)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
self._run_scheduler_loop()
File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py", line 29, in run_before
fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 522, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 658, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis
expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task
operator.mul, self._resolve_map_lengths(run_id, session=session).values()
File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 595, in _resolve_map_lengths
raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
RuntimeError: Failed to populate all mapping metadata; missing: 'jar_params'
[2022-05-11 11:46:30 +0000] [31] [INFO] Shutting down: Master
What you think should happen instead
No response
How to reproduce
No response
Operating System
Debian GNU/Linux 10 (buster)
Versions of Apache Airflow Providers
apache-airflow-providers-databricks
Deployment
Astronomer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug