Decouple deadline reference types from core in task SDK#61461
Decouple deadline reference types from core in task SDK#61461amoghrajesh wants to merge 11 commits intoapache:mainfrom
Conversation
|
Still a draft, sent it out to run an early CI |
|
cc: @ferruzzi this is related to the discussion thread on slack |
|
There's some failing checks which I am looking at |
1169b24 to
711ca6c
Compare
33fcd26 to
613850b
Compare
|
I think this is good more or less now with a couple of minor suggestions. Also would appreciate @ferruzzi if you could provide some perspective if this would step on your work too much. |
| @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED) | ||
| class MyQueuedRef(ReferenceModels.BaseDeadlineReference): | ||
| class MyQueuedRef(BaseDeadlineReference): | ||
| # Optionally, you can specify when you want it calculated by providing a DeadlineReference.TYPES | ||
| def _evaluate_with(self, *, session: Session, **kwargs) -> datetime: | ||
| # Put your business logic here |
There was a problem hiding this comment.
I assume you want to put the same note about imports as you did in the other example?
|
I don't understand how this is intended to work. Can you show me what a dag definition looks like now? Also, what is the process for adding a new reference now? Do we still add the logic in models path but now also and the empty class signature in the definitions file as well? |
|
Good question, it lead me to performing some more testing and it revealed that deadline alerts with custom refs broke after my last PR: #61118. I reworked this one, made some changes and have pushed it and that's reflected in the PR desc. Let me know what you thinlk |
|
|
||
| # Validate the reference class inherits from BaseDeadlineReference | ||
| if not issubclass(reference_class, ReferenceModels.BaseDeadlineReference): | ||
| if not issubclass(reference_class, BaseDeadlineReference): |
There was a problem hiding this comment.
This looks like a backward-compatibility break for custom references inheriting from airflow.models.deadline.ReferenceModels.BaseDeadlineReference (that class exists in released 3.1.x). With this check, those classes now fail registration at DAG import time. Could we accept both base classes or add a compatibility shim to avoid breakage?
| return self.inner_ref.reference_name | ||
|
|
||
| def evaluate_with(self, *, session: Session, interval: timedelta, **kwargs: Any) -> datetime | None: | ||
| filtered_kwargs = {k: v for k, v in kwargs.items() if k in self.required_kwargs} |
There was a problem hiding this comment.
SerializedCustomReference currently filters kwargs using wrapper.required_kwargs (empty by default), so dag_id/run_id and custom kwargs are dropped before calling inner_ref._evaluate_with(). This can break custom references that depend on required kwargs. Please delegate required_kwargs to inner_ref (or pass through kwargs) before filtering.
Was generative AI tooling used to co-author this PR?
Cursor IDE with Claude sonnet 4 1/2
closes: #59303
Summary
Restore support for custom deadline references by implementing a
SerializedCustomReferencewrapper in Core that bridges SDK defined custom refs with core serialization for deadline alerts.Motivation
After PR #61118 moved deadline alert decoding from SDK to Core and introduced
SerializedReferenceModels, custom deadline references stopped working. The decoder only looked up types inSerializedReferenceModels, but custom refs registered via the SDK'sDeadlineReference.register_custom_reference()only existed inReferenceModels. This caused "No reference class found with name: MyCustomRef" errors.What has changed?
Core (
airflow-core/)SerializedCustomReference wrapper
SerializedReferenceModels.SerializedCustomReferenceclass inserialization/definitions/deadline.pyevaluate_with()logic since SDK'sBaseDeadlineReferenceis lightweight wrapper without it_evaluate_with()execution to the inner custom refDecoder (
serialization/decoders.py)__class_pathis present in serialized data, usesSerializedCustomReference.deserialize_reference()to import and wrap the custom refSerializedReferenceModels.get_reference_class()lookupDeadlineAlert model
reference_classproperty returnsSerializedCustomReferencewhen__class_pathis present in the stored reference dictCreate deadline & prune logic (
serialization/definitions/dag.py,models/dagrun.py)SerializedReferenceModels.TYPES.DAGRUNSerializedCustomReferenceis in that tupleArchitecture with these changes
How custom refs work now
__class_pathfor custom refs (no changes)__class_path, imports custom class, wraps inSerializedCustomReferenceisinstance(ref, TYPES.DAGRUN)→ True (wrapper is in tuple) → deadline createdevaluate_with()validates kwargs, calls custom ref's_evaluate_with(), adds intervalreference_class in TYPES.DAGRUN→ True → deadline pruned on successTesting
Define the custom reference in a plugin:
DAG:
Result:

{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.