Add Human-in-the-loop logic to core Airflow and implement HITLOperator, ApprovalOperator, HITLEntryOperator in standard provider#52868
Conversation
d3f9730 to
ea1fb51
Compare
|
snippet I used for handy local testing from __future__ import annotations
import datetime
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import DAG, task
with DAG(
dag_id="aip_90_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
approve_or_reject = ApprovalOperator(
task_id="approve_or_reject",
subject="This is subject",
body="This is body",
)
approve_if_timeout = ApprovalOperator(
task_id="approve_if_timeout",
subject="This is subject",
defaults="Approve",
execution_timeout=datetime.timedelta(seconds=1),
)
@task
def run_after_approve() -> None:
pass
reject_if_timeout = ApprovalOperator(
task_id="reject_if_timeout",
subject="This is subject",
defaults="Reject",
execution_timeout=datetime.timedelta(seconds=1),
)
@task
def skip_due_to_reject() -> None:
pass
@task
def skip_due_to_upstream_skip() -> None:
pass
approve_if_timeout >> run_after_approve()
reject_if_timeout >> skip_due_to_reject() >> skip_due_to_upstream_skip()import requests
host = "http://localhost:28080"
token_endpoint = "/auth/token"
resp = requests.post(f"{host}{token_endpoint}", json={"username": "admin", "password": "admin"})
token = resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
get_all_endpoint = "/api/v2/hitl-details/"
resp = requests.get(f"{host}{get_all_endpoint}", headers=headers)
resp.json()
dag_id = "aip_90_dag"
dag_run_id = <...>
task_id = "approve_or_reject"
hitl_ti_endpoint = f"/api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}"
resp = requests.get(f"{host}{hitl_ti_endpoint}", headers=headers)
resp.json()
data = {"chosen_options": ["Reject"]}
resp = requests.patch(f"{host}{hitl_ti_endpoint}", headers=headers, json=data)
resp.json() |
|
@guan404ming, this is the new draft pr. Ideally, it won't change dramatically. I think we could use this to test the frontend, and please let me know if there's anything missed. Thanks! |
8e3201d to
85e015a
Compare
b2a32ff to
43ab982
Compare
I assume if you want to have follow-up actions on "Reject" then the branching operator type is more suited. I would see the "Approval" with Reject rather as a function like a ShortCircuit... but no strong feelings. Just feels more intuitive when I re-think... |
jscheffl
left a comment
There was a problem hiding this comment.
Looks all good to me! Looking forward to see it in real life!
#protm
|
#protm |
…or`, `ApprovalOperator`, `HITLEntryOperator` in standard provider (apache#52868) * feat: AIP-90 PoC * feat(hitl): move hitl execution api to core * feat(hitl): move models * feat(hitl): move core_api from provider to core * feat(hitl): add migration files * fix: fix ci issue * build: rebuild frontend * fix(hitl): generate datamodel * fix(hitl): add HITL module to models * feat(hitl): add multiple support * test(hitl): improve supervisor.handle_request * test(hitl): add execution_time test cases * test(hitl): add test cases to public api * refactor(hitl): replace ImportError with AirflowOptionalProviderFeatureException * refactor(hitl): remove unnecessay directories * test(hitl): add test cases to execution api * test(hitl): add test cases to operators * style: fix mypy warning * ci: fix ci errors * feat(hitl): add fk constraint * feat(hitl): set params as ParamsDict * docs(hitl_operator): update body description * feat(hitl): rename hitl-responses as hitl-details * feat(hitl): rename response_content as chosen_options * feat(hitl): rename default as defaults * feat(hitl): rewrite public api to fit frontend's need * style(hitl): improve test typing * test(hitl): add mapped test cases * feat(hitl): merge termination operator into approval operators * feat(hitl): allow timeout without default
(from https://github.com/apache/airflow/tree/python-client/3.1.0rc1) ## New Features: - Add `map_index` filter to TaskInstance API queries ([#55614](apache/airflow#55614)) - Add `has_import_errors` filter to Core API GET /dags endpoint ([#54563](apache/airflow#54563)) - Add `dag_version` filter to get_dag_runs endpoint ([#54882](apache/airflow#54882)) - Implement pattern search for event log endpoint ([#55114](apache/airflow#55114)) - Add asset-based filtering support to DAG API endpoint ([#54263](apache/airflow#54263)) - Add Greater Than and Less Than range filters to DagRuns and Task Instance list ([#54302](apache/airflow#54302)) - Add `try_number` as filter to task instances ([#54695](apache/airflow#54695)) - Add filters to Browse XComs endpoint ([#54049](apache/airflow#54049)) - Add Filtering by DAG Bundle Name and Version to API routes ([#54004](apache/airflow#54004)) - Add search filter for DAG runs by triggering user name ([#53652](apache/airflow#53652)) - Enable multi sorting (AIP-84) ([#53408](apache/airflow#53408)) - Add `run_on_latest_version` support for backfill and clear operations ([#52177](apache/airflow#52177)) - Add `run_id_pattern` search for Dag Run API ([#52437](apache/airflow#52437)) - Add tracking of triggering user to Dag runs ([#51738](apache/airflow#51738)) - Expose DAG parsing duration in the API ([#54752](apache/airflow#54752)) ## New API Endpoints: - Add Human-in-the-Loop (HITL) endpoints for approval workflows ([#52868](apache/airflow#52868), [#53373](apache/airflow#53373), [#53376](apache/airflow#53376), [#53885](apache/airflow#53885), [#53923](apache/airflow#53923), [#54308](apache/airflow#54308), [#54310](apache/airflow#54310), [#54723](apache/airflow#54723), [#54773](apache/airflow#54773), [#55019](apache/airflow#55019), [#55463](apache/airflow#55463), [#55525](apache/airflow#55525), [#55535](apache/airflow#55535), [#55603](apache/airflow#55603), [#55776](apache/airflow#55776)) - Add endpoint to watch dag run until finish ([#51920](apache/airflow#51920)) - Add TI bulk actions endpoint ([#50443](apache/airflow#50443)) - Add Keycloak Refresh Token Endpoint ([#51657](apache/airflow#51657)) ## Deprecations: - Mark `DagDetailsResponse.concurrency` as deprecated ([#55150](apache/airflow#55150)) ## Bug Fixes: - Fix dag import error modal pagination ([#55719](apache/airflow#55719))
(from https://github.com/apache/airflow/tree/python-client/3.1.0rc1) ## New Features: - Add `map_index` filter to TaskInstance API queries ([#55614](apache/airflow#55614)) - Add `has_import_errors` filter to Core API GET /dags endpoint ([#54563](apache/airflow#54563)) - Add `dag_version` filter to get_dag_runs endpoint ([#54882](apache/airflow#54882)) - Implement pattern search for event log endpoint ([#55114](apache/airflow#55114)) - Add asset-based filtering support to DAG API endpoint ([#54263](apache/airflow#54263)) - Add Greater Than and Less Than range filters to DagRuns and Task Instance list ([#54302](apache/airflow#54302)) - Add `try_number` as filter to task instances ([#54695](apache/airflow#54695)) - Add filters to Browse XComs endpoint ([#54049](apache/airflow#54049)) - Add Filtering by DAG Bundle Name and Version to API routes ([#54004](apache/airflow#54004)) - Add search filter for DAG runs by triggering user name ([#53652](apache/airflow#53652)) - Enable multi sorting (AIP-84) ([#53408](apache/airflow#53408)) - Add `run_on_latest_version` support for backfill and clear operations ([#52177](apache/airflow#52177)) - Add `run_id_pattern` search for Dag Run API ([#52437](apache/airflow#52437)) - Add tracking of triggering user to Dag runs ([#51738](apache/airflow#51738)) - Expose DAG parsing duration in the API ([#54752](apache/airflow#54752)) ## New API Endpoints: - Add Human-in-the-Loop (HITL) endpoints for approval workflows ([#52868](apache/airflow#52868), [#53373](apache/airflow#53373), [#53376](apache/airflow#53376), [#53885](apache/airflow#53885), [#53923](apache/airflow#53923), [#54308](apache/airflow#54308), [#54310](apache/airflow#54310), [#54723](apache/airflow#54723), [#54773](apache/airflow#54773), [#55019](apache/airflow#55019), [#55463](apache/airflow#55463), [#55525](apache/airflow#55525), [#55535](apache/airflow#55535), [#55603](apache/airflow#55603), [#55776](apache/airflow#55776)) - Add endpoint to watch dag run until finish ([#51920](apache/airflow#51920)) - Add TI bulk actions endpoint ([#50443](apache/airflow#50443)) - Add Keycloak Refresh Token Endpoint ([#51657](apache/airflow#51657)) ## Deprecations: - Mark `DagDetailsResponse.concurrency` as deprecated ([#55150](apache/airflow#55150)) ## Bug Fixes: - Fix dag import error modal pagination ([#55719](apache/airflow#55719))
Why
AIP-90
Closes: #52205
Closes: #52204
Closes: #52348
What
HITLDetail: Store input request from task and human responseGET /api/v2/hitl-details: get allHITLDetaildetailsGET /api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}: Get aHITLDetailfor a specific task instanceGET /api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}/{map_index}: Get aHITLDetailfor a specific mapped task instancePATCH /api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}: write the response content part back to an existingHITLDetailfor a specific task instancePATCH /api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}/{map_index}: write the response content part back to an existingHITLDetailfor a specific mapped task instancePOST /hitl-details/{dag_id}/{dag_run_id}/{task_id}: Create aHITLDetailwith input request content (HITLOperatorcalls it inexecuteto create aHITLDetailwith the input requests details)PATCH /hitl-details/{dag_id}/{dag_run_id}/{task_id}: After deferring,HITLTriggerchecks whether the timeout has passed; if so, write the default value back. (This endpoint might be helpful when dealing with XCom in follow-up PRs as well)GET /hitl-details/{dag_id}/{dag_run_id}/{task_id}: after deferring,HITLTriggerkeeps checking whether the response is ready. (Only return the human response for now, as other information might not be needed )^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.