Skip to content

WorkflowsCreateExecutionOperator execution argument only receive bytes #27165

@akakakakakaa

Description

@akakakakakaa

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==7.0.0

Apache Airflow version

2.3.2

Operating System

Ubuntu 20.04.5 LTS (Focal Fossa)

Deployment

Docker-Compose

Deployment details

No response

What happened

WorkflowsCreateExecutionOperator triggers google cloud workflows and execution param receives argument as {"argument": {"key": "val", "key", "val"...}

But, When I passed argument as dict using render_template_as_native_obj=True, protobuf error occured TypeError: {'projectId': 'project-id', 'location': 'us-east1'} has type dict, but expected one of: bytes, unicode.

When I passed argument as bytes {"argument": b'{\n "projectId": "project-id",\n "location": "us-east1"\n}' It working.

What you think should happen instead

execution argument should be Dict instead of bytes.

How to reproduce

not working

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator

with DAG(
    dag_id="continual_learning_deid_norm_h2h_test",
    params={
        "location": Param(type="string", default="us-east1"),
        "project_id": Param(type="string", default="project-id"),
        "workflow_id": Param(type="string", default="orkflow"),
        "workflow_execution_info": {
            "argument": {
                "projectId": "project-id",
                "location": "us-east1"
            }
        }
    },
    render_template_as_native_obj=True
) as dag:
    execution = "{{ params.workflow_execution_info }}"
    create_execution = WorkflowsCreateExecutionOperator(
        task_id="create_execution",
        location="{{ params.location }}",
        project_id="{{ params.project_id }}",
        workflow_id="{{ params.workflow_id }}",
        execution="{{ params.workflow_execution_info }}"
    )

    start_operator = DummyOperator(task_id='test_task')

    start_operator >> create_execution

working

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator

with DAG(
    dag_id="continual_learning_deid_norm_h2h_test",
    params={
        "location": Param(type="string", default="us-east1"),
        "project_id": Param(type="string", default="project-id"),
        "workflow_id": Param(type="string", default="orkflow"),
        "workflow_execution_info": {
            "argument": b'{\n  "projectId": "project-id",\n  "location": "us-east1"\n}'
        }
    },
    render_template_as_native_obj=True
) as dag:
    execution = "{{ params.workflow_execution_info }}"
    create_execution = WorkflowsCreateExecutionOperator(
        task_id="create_execution",
        location="{{ params.location }}",
        project_id="{{ params.project_id }}",
        workflow_id="{{ params.workflow_id }}",
        execution="{{ params.workflow_execution_info }}"
    )

    start_operator = DummyOperator(task_id='test_task')

    start_operator >> create_execution

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions