-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Closed
Labels
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==7.0.0
Apache Airflow version
2.3.2
Operating System
Ubuntu 20.04.5 LTS (Focal Fossa)
Deployment
Docker-Compose
Deployment details
No response
What happened
WorkflowsCreateExecutionOperator triggers google cloud workflows and execution param receives argument as {"argument": {"key": "val", "key", "val"...}
But, When I passed argument as dict using render_template_as_native_obj=True, protobuf error occured TypeError: {'projectId': 'project-id', 'location': 'us-east1'} has type dict, but expected one of: bytes, unicode.
When I passed argument as bytes {"argument": b'{\n "projectId": "project-id",\n "location": "us-east1"\n}' It working.
What you think should happen instead
execution argument should be Dict instead of bytes.
How to reproduce
not working
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator
with DAG(
dag_id="continual_learning_deid_norm_h2h_test",
params={
"location": Param(type="string", default="us-east1"),
"project_id": Param(type="string", default="project-id"),
"workflow_id": Param(type="string", default="orkflow"),
"workflow_execution_info": {
"argument": {
"projectId": "project-id",
"location": "us-east1"
}
}
},
render_template_as_native_obj=True
) as dag:
execution = "{{ params.workflow_execution_info }}"
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location="{{ params.location }}",
project_id="{{ params.project_id }}",
workflow_id="{{ params.workflow_id }}",
execution="{{ params.workflow_execution_info }}"
)
start_operator = DummyOperator(task_id='test_task')
start_operator >> create_executionworking
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator
with DAG(
dag_id="continual_learning_deid_norm_h2h_test",
params={
"location": Param(type="string", default="us-east1"),
"project_id": Param(type="string", default="project-id"),
"workflow_id": Param(type="string", default="orkflow"),
"workflow_execution_info": {
"argument": b'{\n "projectId": "project-id",\n "location": "us-east1"\n}'
}
},
render_template_as_native_obj=True
) as dag:
execution = "{{ params.workflow_execution_info }}"
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location="{{ params.location }}",
project_id="{{ params.project_id }}",
workflow_id="{{ params.workflow_id }}",
execution="{{ params.workflow_execution_info }}"
)
start_operator = DummyOperator(task_id='test_task')
start_operator >> create_executionAnything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable