-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Make Dag Serialization a hard requirement #11335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
@ashb This is ready for review |
tests/jobs/test_scheduler_job.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have this on every case? If so we should do it in setup with a second patcher (or even if most we should do it in setup)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 7fcd122
Scheduler HA uses Serialized DAGs and hence it is a strict requirement for 2.0. It also has performance benefits for the Webserver and so should be used be default anyway.
| ) | ||
| dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized | ||
| dag = dagbag.get_dag(dag_id) | ||
| if dag_id not in dagbag.dags: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if dag_id not in dagbag.dags: | |
| if not dag: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 690e80b
| dag = current_app.dag_bag.get_dag(dag_id) | ||
| if not dag: | ||
| raise NotFound(error_message) | ||
| except SerializedDagNotFound: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we made AirflowNotFound inherit from connextion.NotFound this code could be a simpler here.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a separate issue for it, to address it in a separate PR (not strictly related to this PR), so anyone else can also pick that task too.
Scheduler HA uses Serialized DAGs and hence it is a strict the requirement for 2.0. It also has performance benefits for the Webserver and so should be used by default anyway. Task execution on workers will continue to use the actual files for execution. Scheduler, Experimental API and Webserver will read the DAGs from DB using `DagBag(read_dags_from_db=True)`
Scheduler HA uses Serialized DAGs and hence it is a strict the requirement for 2.0. It also has performance benefits for the Webserver and so should be used by default anyway. Task execution on workers will continue to use the actual files for execution. Scheduler, Experimental API and Webserver will read the DAGs from DB using `DagBag(read_dags_from_db=True)`
| self.render_templates() | ||
| except (TemplateAssertionError, UndefinedError) as e: | ||
| raise AirflowException( | ||
| "Webserver does not have access to User-defined Macros or Filters " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kaxil , if using Airflow 2.0 or using Airflow 1.10.* with store_serialized_dags = True turned on, we hit this error here when user clicks on "Rendered Template" on tasks that are using user_defined_macros in jinja template fields.
We also hit a similar problem if user clears ExternalTaskMarker that uses user_defined_macros in the jinja template fields (because the dag.clear() function calls ti.render_templates() to figure out the actual values of the template fields.
How do you recommend addressing these issues going forward? Should we allow some webserver functions to get access to these user_defined_macros? Or should we serialize the rendered template values so that webserver can access them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is already handled in the Webserver:
Lines 883 to 890 in 10b8ecc
| ti = models.TaskInstance(task=task, execution_date=dttm) | |
| try: | |
| ti.get_rendered_template_fields() | |
| except AirflowException as e: # pylint: disable=broad-except | |
| msg = "Error rendering template: " + escape(e) | |
| if e.__cause__: # pylint: disable=using-constant-test | |
| msg += Markup("<br><br>OriginalError: ") + escape(e.__cause__) | |
| flash(msg, "error") |
i.e we just ask users to run it via a CLI instead.
The flash message is so that they know where to look for i.e. run it using the CLI
The ExternalTaskMarker does it cause failure in Scheduler or Webserver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @kaxil . Thanks for pointing out that the "Rendered Template" error is handled with an error message. Then that issue is a small annoyance for the user because they cannot see the rendered arguments in the website.
However, the ExternalTaskMarker actually causes issue in the Webserver when user hits Clear. I opened an issue here #13827
I don't mind working on a fix. Just wondering if you have any suggestions how to do it.
Scheduler HA uses Serialized DAGs and hence it is a strict
requirement for 2.0.
It also has performance benefits for the Webserver and so should
be used be default anyway.
Task execution on workers will continue to use the actual files for execution.
Scheduler, Experimental API and Webserver will read the DAGs from DB using
DagBag(read_dags_from_db=True)^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.