feat(task_sdk): add support for inlet_events in Task Context#45960
Merged
Lee-W merged 17 commits intoapache:mainfrom Mar 3, 2025
Merged
feat(task_sdk): add support for inlet_events in Task Context#45960Lee-W merged 17 commits intoapache:mainfrom
Lee-W merged 17 commits intoapache:mainfrom
Conversation
3cb8fb4 to
6ac30c8
Compare
0c8da84 to
9460573
Compare
2 tasks
a267114 to
f613905
Compare
Member
Author
|
Hey @ashb and @amoghrajesh , could you please take a quick look at this PR? in case I'm doing something wrong regarding task_sdk. The tests are not fixed as of now. I'll work on that if the overall logic looks ok. Thanks! |
amoghrajesh
reviewed
Feb 19, 2025
Contributor
amoghrajesh
left a comment
There was a problem hiding this comment.
General direction looking good, will take a better look with tests.
Member
Author
|
Thanks @amoghrajesh ! |
b5fe1d2 to
d07c722
Compare
amoghrajesh
approved these changes
Feb 28, 2025
Contributor
amoghrajesh
left a comment
There was a problem hiding this comment.
Almost there. Preemptively approving provided my comments are handled.
uranusjr
approved these changes
Mar 2, 2025
cd49204 to
1cf88c4
Compare
Member
Author
|
@amoghrajesh I'll merge it this afternoon. Please let me know if you want to take a look again 🙂 |
shahar1
pushed a commit
to shahar1/airflow
that referenced
this pull request
Mar 5, 2025
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
2 tasks
nailo2c
pushed a commit
to nailo2c/airflow
that referenced
this pull request
Apr 4, 2025
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
ntBre
pushed a commit
to astral-sh/ruff
that referenced
this pull request
Feb 9, 2026
…context key for Airflow 3.0 (`AIR301`) (#22850) ## Summary <!-- What's the purpose of the change? What does it do, and why? --> Context: apache/airflow#41641 1. apache/airflow#45961 * <strike>create_dagrun removed from airflow...DAG</strike> (This has already been implemented in AIR301) * context key dag_run.external_trigger removed 2. apache/airflow#45960 * context["inlet_events"]["url"] → context["inlet_events"][Asset("url")] 3. apache/airflow#41348 * context key triggering_dataset_events → triggering_asset_events The existing AIR301 rules can detect when users access a removed key such as `execution_date` through Airflow's `context`. For example, `context["execution_date"]`, or `context["dag_run"]`. However, if an attribute is deprecated from a context key, such as `context["dag_run"].external_trigger`, the current implement will not flag it. This PR adds the logic for such check, and add two rules to flag the deprecated attribute for the `"dag_run"` and `"inlet_events"` context key. In addition to this, `"triggering_dataset_events"` is a deprecated context key which can be handled by the existing rule. However, the existing rule doesn't raise a diagnostic. Hence, the rule logic is refactored a little bit, such that we can add this check and suggest a `Replacement::Rename`. ## Test Plan <!-- How was it tested? --> The test cases have been added to `AIR301_context.py`, and all the tests have been run locally and success. @Lee-W , could you please review it when you have time, thanks! ## Notes In #22376, we introduced some improvements to the AIR301 code. I will re-base this PR when we are all good on that, so it can pick up those code structure improvements, and updated rules.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
We need to add support for inlet-events in task_sdk context as well.
What
InletEventsAccessorsfromairflow/utils/context.pytotask_sdk/src/airflow/sdk/execution_time/context.pySUPERVISOR_COMMSto retrieve asset eventsAssetEventResponse,DagRunAssetReferencedata models/asset-eventsroute with/by-asset-name-uri/by-asset-uri/by-asset-name/by-alias-nameAssetEventOperationsto handle the routingtask_sdk/src/airflow/sdk/execution_time/supervisor.pycloses: #45717
closes: #46852
closes: #46852
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.