Skip to content

SQLColumnCheckOperator failures after upgrading to common-sql==1.3.0 #27976

@mag3141592

Description

@mag3141592

Apache Airflow Provider(s)

common-sql

Versions of Apache Airflow Providers

apache-airflow-providers-google==8.2.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-salesforce==5.0.0
apache-airflow-providers-slack==5.1.0
apache-airflow-providers-snowflake==3.2.0

Issue:
apache-airflow-providers-common-sql==1.3.0

Apache Airflow version

2.4.3

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

No response

What happened

Problem occurred when upgrading from common-sql=1.2.0 to common-sql=1.3.0

Getting a KEY_ERROR when running a unique_check and null_check on a column.

1.3.0 log:
Screen Shot 2022-11-28 at 2 01 20 PM

1.2.0 log:
Screen Shot 2022-11-28 at 2 00 15 PM

What you think should happen instead

Potential causes:

  • seems to be indexing based on the test query column COL_NAME instead of the table column STRIPE_ID
  • the record from the test changed types went from a tuple to a list of dictionaries.
  • no tolerance is specified for these tests, so .get('tolerance') looks like it will cause an error without a default specified like .get('tolerance', None)

Expected behavior:

  • these tests continue to pass with the upgrade
  • tolerance is not a required key.

How to reproduce

from datetime import datetime
from airflow import DAG

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator

my_conn_id = "snowflake_default"

default_args={"conn_id": my_conn_id}

with DAG(
    dag_id="airflow_providers_example",
    schedule=None,
    start_date=datetime(2022, 11, 27),
    default_args=default_args,
) as dag:

    create_table = SnowflakeOperator(
        task_id="create_table",
        sql=""" CREATE OR REPLACE TABLE testing AS (
                        SELECT
                            1 AS row_num,
                            'not null' AS field

                        UNION ALL

                        SELECT
                            2 AS row_num,
                            'test' AS field

                        UNION ALL

                        SELECT
                            3 AS row_num,
                            'test 2' AS field
                    )""",
    )

    column_checks = SQLColumnCheckOperator(
        task_id="column_checks",
        table="testing",
        column_mapping={
            "field": {"unique_check": {"equal_to": 0}, "null_check": {"equal_to": 0}}
        },
    )

    create_table >> column_checks

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugpriority:mediumBug that should be fixed before next release but would not block a release

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions