Split Deadline Alerts into core and task-sdk components#61118
Merged
amoghrajesh merged 13 commits intoapache:mainfrom Jan 30, 2026
Merged
Split Deadline Alerts into core and task-sdk components#61118amoghrajesh merged 13 commits intoapache:mainfrom
amoghrajesh merged 13 commits intoapache:mainfrom
Conversation
amoghrajesh
commented
Jan 27, 2026
amoghrajesh
commented
Jan 27, 2026
ferruzzi
reviewed
Jan 27, 2026
airflow-core/tests/unit/serialization/test_serialized_objects.py
Outdated
Show resolved
Hide resolved
Contributor
Author
|
@ferruzzi thanks for your review, I have handled your comments now, let me know how it looks. |
ferruzzi
approved these changes
Jan 29, 2026
Contributor
ferruzzi
left a comment
There was a problem hiding this comment.
Thanks for addressing my questions. I think it looks right.
Contributor
Author
|
Thanks @ferruzzi! Just to gain some more confidence prior to merge, I tried it out functionally. Used this custom callback: async def custom_async_callback(**kwargs):
context = kwargs.get("context", {})
print(
f"Deadline exceeded for Dag {context.get('dag_run', {}).get('dag_id')}!"
)
print(f"Context: {context}")
print(f"Alert type: {kwargs.get('alert_type')}")This DAG: from datetime import timedelta
from deadline_callback import custom_async_callback
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference
with DAG(
dag_id="custom_deadline_alert",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=10),
callback=AsyncCallback(
custom_async_callback,
kwargs={"alert_type": "time_exceeded"},
),
),
):
BashOperator(task_id="example_task", bash_command="sleep 30")And it works as I expect it to:
|
subhash-0000
pushed a commit
to subhash-0000/airflow
that referenced
this pull request
Jan 30, 2026
1 task
morelgeorge
pushed a commit
to morelgeorge/airflow
that referenced
this pull request
Feb 1, 2026
shashbha14
pushed a commit
to shashbha14/airflow
that referenced
this pull request
Feb 2, 2026
jason810496
pushed a commit
to abhijeets25012-tech/airflow
that referenced
this pull request
Feb 3, 2026
jhgoebbert
pushed a commit
to jhgoebbert/airflow_Owen-CH-Leung
that referenced
this pull request
Feb 8, 2026
1 task
choo121600
pushed a commit
to choo121600/airflow
that referenced
this pull request
Feb 22, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Was generative AI tooling used to co-author this PR?
Used cursor IDE
Why This Change?
Trying to assist the client-server separation by moving worker side deadline functionality to
task-sdkwhile keeping the DB dependent evaluation logic inairflow-core. This follows the established pattern from the assets migration: #58993What stays where?
In simpler terms, this PR tries to address this.
SDK
DeadlineAlert: Its the user facing class for defining deadline alerts (no serialization methods from now)DeadlineReference: The same factory class for creating deadline referencesReferenceModels.*: Original reference implementations for backward compatibilityThe principle here it to keep the lightweight DAG authoring interface with no database dependencies in sdk
Core / Serialization module
SerializedDeadlineAlert: Internal representation for core usages used post deserialization of a DeadlineAlertSerializedReferenceModels.*: Reference implementations with database accessencode_deadline_alert()/decode_deadline_alert(): Centralized serialization functions used to ser/deser the deadline alertsThe principle here is to keep the serialization, deserialization, and deadline evaluation with database access in core.
Serialization Changes
Structure
Serialization format remains unchanged - no breaking changes to stored DAGs:
{ "__type": "deadline_alert", "__var": { "reference": {"reference_type": "DagRunLogicalDateDeadline"}, "interval": 3600.0, "callback": { "__classname__": "airflow.sdk.definitions.callback.AsyncCallback", "__version__": 0, "__data__": {"path": "...", "kwargs": {...}} } } }Same with the flow of control
DeadlineAlert→ dict viaencode_deadline_alert()usingairflow.sdk.serdeSerializedDeadlineAlertviadecode_deadline_alert()SerializedReferenceModelsuses database session to calculate deadlinesOne thing of note is the callback serialisation, I chose to continue using serde for this purpose because BaseSerialisation cannot handle callbacks. Using serde made sense since this part of serialisation runs in dag processor, which untilmately is not a core component and can use task sdk. So, flow:
airflow.sdk.serde.serialize()/deserialize()for proper callback handlingBackward Compatibility
DagRunLogicalDateDeadlinenotSerializedDagRunLogicalDateDeadline){pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.