Skip to content

Decouple deadline reference types from core in task SDK#61461

Open
amoghrajesh wants to merge 11 commits intoapache:mainfrom
astronomer:complete-deadline-alerts-work
Open

Decouple deadline reference types from core in task SDK#61461
amoghrajesh wants to merge 11 commits intoapache:mainfrom
astronomer:complete-deadline-alerts-work

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 4, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Cursor IDE with Claude sonnet 4 1/2

closes: #59303

Summary

Restore support for custom deadline references by implementing a SerializedCustomReference wrapper in Core that bridges SDK defined custom refs with core serialization for deadline alerts.

Motivation

After PR #61118 moved deadline alert decoding from SDK to Core and introduced SerializedReferenceModels, custom deadline references stopped working. The decoder only looked up types in SerializedReferenceModels, but custom refs registered via the SDK's DeadlineReference.register_custom_reference() only existed in ReferenceModels. This caused "No reference class found with name: MyCustomRef" errors.

What has changed?

Core (airflow-core/)

SerializedCustomReference wrapper

  • Added SerializedReferenceModels.SerializedCustomReference class in serialization/definitions/deadline.py
  • Wraps custom deadline references imported from user code
  • Implements the evaluate_with() logic since SDK's BaseDeadlineReference is lightweight wrapper without it
  • Delegates _evaluate_with() execution to the inner custom ref

Decoder (serialization/decoders.py)

  • When __class_path is present in serialized data, uses SerializedCustomReference.deserialize_reference() to import and wrap the custom ref
  • Builtin types continue to use SerializedReferenceModels.get_reference_class() lookup

DeadlineAlert model

  • reference_class property returns SerializedCustomReference when __class_path is present in the stored reference dict
  • Returns standard serialized types for built-ins

Create deadline & prune logic (serialization/definitions/dag.py, models/dagrun.py)

  • No changes needed - already use SerializedReferenceModels.TYPES.DAGRUN
  • Custom refs now work because SerializedCustomReference is in that tuple

Architecture with these changes

SDK (task-sdk/) - DAG Authoring
├── deadline.py
│ ├── DeadlineAlert (user-facing)
│ ├── DeadlineReference (public interface)
│ └── BaseDeadlineReference (lightweight, serialization only)
Core (airflow-core/) - Scheduling/Execution
├── serialization/
│ ├── encoders.py (adds class_path for custom types)
│ ├── decoders.py (wraps custom types in SerializedCustomReference)
│ └── definitions/deadline.py
│ ├── SerializedReferenceModels
│ │ ├── Built-in types (with evaluate_with)
│ │ └── SerializedCustomReference (wrapper for custom refs)
│ └── TYPES.DAGRUN (includes wrapper)

How custom refs work now

  1. Encoder adds __class_path for custom refs (no changes)
  2. Decoder sees __class_path, imports custom class, wraps in SerializedCustomReference
  3. isinstance(ref, TYPES.DAGRUN) → True (wrapper is in tuple) → deadline created
  4. Wrapper's evaluate_with() validates kwargs, calls custom ref's _evaluate_with(), adds interval
  5. reference_class in TYPES.DAGRUN → True → deadline pruned on success

Testing

Define the custom reference in a plugin:

from airflow.sdk.definitions.deadline import DeadlineReference, deadline_reference, BaseDeadlineReference
from airflow._shared.timezones import timezone


@deadline_reference()
class MyCustomRef(BaseDeadlineReference):
    """Custom ref: deadline is now + interval."""

    def _evaluate_with(self, *, session, **kwargs):
        return timezone.utcnow()


DeadlineReference.register_custom_reference(MyCustomRef)

async def on_deadline(**kwargs):
    print("Custom ref deadline exceeded", kwargs)

DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference

from custom_deadline_refs import on_deadline


with DAG(
    dag_id="testing-custom-reference-deadlines",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    deadline=DeadlineAlert(
        reference=DeadlineReference.MyCustomRef,
        interval=timedelta(seconds=10),
        callback=AsyncCallback(on_deadline),
    ),
):
    BashOperator(task_id="example_task", bash_command="sleep 120")

Result:
image

image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh added this to the Airflow 3.2.0 milestone Feb 4, 2026
@amoghrajesh amoghrajesh self-assigned this Feb 4, 2026
@amoghrajesh amoghrajesh marked this pull request as draft February 4, 2026 17:50
@amoghrajesh
Copy link
Contributor Author

Still a draft, sent it out to run an early CI

@amoghrajesh
Copy link
Contributor Author

cc: @ferruzzi this is related to the discussion thread on slack

@amoghrajesh amoghrajesh marked this pull request as ready for review February 8, 2026 16:42
@amoghrajesh
Copy link
Contributor Author

There's some failing checks which I am looking at

@amoghrajesh amoghrajesh force-pushed the complete-deadline-alerts-work branch from 1169b24 to 711ca6c Compare February 9, 2026 08:19
@amoghrajesh amoghrajesh requested a review from uranusjr February 9, 2026 09:10
@amoghrajesh amoghrajesh force-pushed the complete-deadline-alerts-work branch from 33fcd26 to 613850b Compare February 10, 2026 10:43
@uranusjr
Copy link
Member

I think this is good more or less now with a couple of minor suggestions. Also would appreciate @ferruzzi if you could provide some perspective if this would step on your work too much.

@deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
class MyQueuedRef(BaseDeadlineReference):
# Optionally, you can specify when you want it calculated by providing a DeadlineReference.TYPES
def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
# Put your business logic here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you want to put the same note about imports as you did in the other example?

@ferruzzi
Copy link
Contributor

I don't understand how this is intended to work. Can you show me what a dag definition looks like now? Also, what is the process for adding a new reference now? Do we still add the logic in models path but now also and the empty class signature in the definitions file as well?

@amoghrajesh amoghrajesh requested a review from XD-DENG as a code owner February 13, 2026 12:03
@amoghrajesh
Copy link
Contributor Author

Good question, it lead me to performing some more testing and it revealed that deadline alerts with custom refs broke after my last PR: #61118.

I reworked this one, made some changes and have pushed it and that's reflected in the PR desc. Let me know what you thinlk

@amoghrajesh amoghrajesh added the full tests needed We need to run full set of tests for this PR to merge label Feb 17, 2026
@amoghrajesh amoghrajesh reopened this Feb 17, 2026
@amoghrajesh amoghrajesh requested review from Lee-W and potiuk February 17, 2026 06:53
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


# Validate the reference class inherits from BaseDeadlineReference
if not issubclass(reference_class, ReferenceModels.BaseDeadlineReference):
if not issubclass(reference_class, BaseDeadlineReference):
Copy link
Member

@kaxil kaxil Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a backward-compatibility break for custom references inheriting from airflow.models.deadline.ReferenceModels.BaseDeadlineReference (that class exists in released 3.1.x). With this check, those classes now fail registration at DAG import time. Could we accept both base classes or add a compatibility shim to avoid breakage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is it not an issue?

return self.inner_ref.reference_name

def evaluate_with(self, *, session: Session, interval: timedelta, **kwargs: Any) -> datetime | None:
filtered_kwargs = {k: v for k, v in kwargs.items() if k in self.required_kwargs}
Copy link
Member

@kaxil kaxil Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializedCustomReference currently filters kwargs using wrapper.required_kwargs (empty by default), so dag_id/run_id and custom kwargs are dropped before calling inner_ref._evaluate_with(). This can break custom references that depend on required kwargs. Please delegate required_kwargs to inner_ref (or pass through kwargs) before filtering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:deadline-alerts AIP-86 (former AIP-57) area:serialization area:task-sdk full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Separate DeadlineAlert implementations in SDK and Core

5 participants