Skip to content

SqlSensor with Standard SQL BigQuery dialect (allow to pass config to injected (DI) hooks) #17315

@dinigo

Description

@dinigo

Apache Airflow version: 1.10.15, but happens in all of them

Environment: GCP Composer -> Kubernetes + Celery

  • Cloud provider or hardware configuration: Google

What happened:
Errors appear while trying to use SqlSensor against a BigQuery View.
For context I'll explain a little bit: BigQuery seemingly executes only SQL, but in reality there are two versions of the language. For backwards compatibility the so called legacySQL is set by default.

I'm hot-fixing it like so:

class BigQuerySqlSensor(SqlSensor):
    """
    Overwrites hook config when using SqlSensor with a BigQuery
    connection
    """
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        hook.location = 'europe-west1'
        return hook

How to reproduce it:
The issue I'm experiencing is trying to run a simple query agains a view

check_resource_has_data = SqlSensor(
    task_id="check_resource_has_data",
    conn_id='google_cloud_default',
    sql=f'SELECT COUNT(*) > 0 FROM `{dataset_id}.{resource_id}`',
)

First Error: As I am using a standard language but the hook automatically sets this to legacy BigQuery will try to run this as Legacy, fail to parse and return an erro

[2021-07-28 16:03:05,499] {taskinstance.py:1152} ERROR - BigQuery job failed. Final error was: {'reason': 'invalid', 'location': '`my-project.test.cashflow_abacus_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}. The job was: {'kind': 'bigquery#job', 'etag': 'random_etag==', 'id': 'my-project:US.job_some_randome_id_123412', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/job_some_randome_id_123412?location=US', 'user_email': '[email protected]', 'configuration': {'query': {'query': '
                    SELECT COUNT(*) > 0
                    FROM `my-project.test.my_view`
                ', 'priority': 'INTERACTIVE', 'useLegacySql': True}, 
                'jobType': 'QUERY'}, 'jobReference': {'projectId': 
                'my-project', 'jobId': 'job_some_randome_id_123412', 'location': 'SO'}, 'statistics': {'creationTime': '1627488185084', 'startTime': '1627488185109', 'endTime': '1627488185109'}, 'status': {'errorResult': {'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}, 'errors': [{'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}], 'state': 'DONE'}}

Second Error: In order to switch the "dialect" you have several options. One of them, the closest to the code is prepending the query with a comment line like so # standardSQL (docs). But

What you expected to happen:

There's no way to pass information to the hock that the SqlSensor is using underneath. It would be interesting to add a hook_params to the SqlSensor to be able to config the underlaying hooks:

# airflow/sensors/sql.py
class SqlSensor(BaseSensorOperator):
    def __init__(
        self, *, conn_id, sql, hook_config: Dict, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
    ):
        self.conn_id = conn_id
        # init all the params...
        self.hook_config = hook_config or {}
        super().__init__(**kwargs)

    def _get_hook(self):
        conn = BaseHook.get_connection(self.conn_id)
        # ...
        return conn.get_hook(**self.hook_config)

And in the connection

# airflow/models/connection.py
class Connection(Base, LoggingMixin):
    # ...
    def get_hook(self, **kwargs):
        """Return hook based on conn_type."""
        # locate hook class ...
        return hook_class(**{conn_id_param: self.conn_id}, **kwargs)

Metadata

Metadata

Assignees

No one assigned

    Labels

    duplicateIssue that is duplicatedkind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions