AIP-72: Port Variable.set From TaskSDK to Models#48177
AIP-72: Port Variable.set From TaskSDK to Models#48177jason810496 wants to merge 6 commits intoapache:mainfrom
Conversation
49268f9 to
51a3a7e
Compare
51a3a7e to
a076fa4
Compare
| def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None: | ||
| from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable | ||
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS | ||
|
|
||
| if serialize_json: | ||
| import json | ||
|
|
||
| value = json.dumps(value, indent=2) | ||
| else: | ||
| value = str(value) | ||
|
|
||
| SUPERVISOR_COMMS.send_request(log=log, msg=PutVariable(key=key, value=value, description=description)) | ||
| msg = SUPERVISOR_COMMS.get_message() | ||
| if isinstance(msg, ErrorResponse): | ||
| raise AirflowRuntimeError(msg) | ||
| return | ||
|
|
||
|
|
There was a problem hiding this comment.
I think I need to port the Secret Backend check_for_write_conflict logic from models ( https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/variable.py#L198-L199 ) to TaskSDK as well?
Just like _get_variable, right?
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/context.py#L154-L171
cc @amoghrajesh
There was a problem hiding this comment.
Thats right we need to do that as well
a076fa4 to
9f83b4c
Compare
|
Just rebased to latest main to resolve conflict. |
amoghrajesh
left a comment
There was a problem hiding this comment.
@jason810496 this only handles the top level variables. We will also have to port over / allow this from the task runner level too.
Could you please handle that as well?
| def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None: | ||
| from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable | ||
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS | ||
|
|
||
| if serialize_json: | ||
| import json | ||
|
|
||
| value = json.dumps(value, indent=2) | ||
| else: | ||
| value = str(value) | ||
|
|
||
| SUPERVISOR_COMMS.send_request(log=log, msg=PutVariable(key=key, value=value, description=description)) | ||
| msg = SUPERVISOR_COMMS.get_message() | ||
| if isinstance(msg, ErrorResponse): | ||
| raise AirflowRuntimeError(msg) | ||
| return | ||
|
|
||
|
|
There was a problem hiding this comment.
Thats right we need to do that as well
9f83b4c to
3ba45df
Compare
|
Discussed with @jason810496 on this one. There is more work left to do on this and it would be nice to do this in RC tomorrow eod. I will be working on this parallely and creating a companion PR that handles the missing gaps in this one too. |
|
Closing this one, as #49005 will address the issue. |
closes: #47920
Why
Setting a variable in Dag is failing due to 'Direct database access via the ORM is not allowed in Airflow 3.0' ( details in issue context )
What
Port
Variable.setfrom TaskSDK toairflow.models.Variable.set