-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version: 1.10.15, but happens in all of them
Environment: GCP Composer -> Kubernetes + Celery
- Cloud provider or hardware configuration: Google
What happened:
Errors appear while trying to use SqlSensor against a BigQuery View.
For context I'll explain a little bit: BigQuery seemingly executes only SQL, but in reality there are two versions of the language. For backwards compatibility the so called legacySQL is set by default.
I'm hot-fixing it like so:
class BigQuerySqlSensor(SqlSensor):
"""
Overwrites hook config when using SqlSensor with a BigQuery
connection
"""
def _get_hook(self):
hook = super()._get_hook()
hook.use_legacy_sql = False
hook.location = 'europe-west1'
return hookHow to reproduce it:
The issue I'm experiencing is trying to run a simple query agains a view
check_resource_has_data = SqlSensor(
task_id="check_resource_has_data",
conn_id='google_cloud_default',
sql=f'SELECT COUNT(*) > 0 FROM `{dataset_id}.{resource_id}`',
)First Error: As I am using a standard language but the hook automatically sets this to legacy BigQuery will try to run this as Legacy, fail to parse and return an erro
[2021-07-28 16:03:05,499] {taskinstance.py:1152} ERROR - BigQuery job failed. Final error was: {'reason': 'invalid', 'location': '`my-project.test.cashflow_abacus_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}. The job was: {'kind': 'bigquery#job', 'etag': 'random_etag==', 'id': 'my-project:US.job_some_randome_id_123412', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/job_some_randome_id_123412?location=US', 'user_email': '[email protected]', 'configuration': {'query': {'query': '
SELECT COUNT(*) > 0
FROM `my-project.test.my_view`
', 'priority': 'INTERACTIVE', 'useLegacySql': True},
'jobType': 'QUERY'}, 'jobReference': {'projectId':
'my-project', 'jobId': 'job_some_randome_id_123412', 'location': 'SO'}, 'statistics': {'creationTime': '1627488185084', 'startTime': '1627488185109', 'endTime': '1627488185109'}, 'status': {'errorResult': {'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}, 'errors': [{'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}], 'state': 'DONE'}}
Second Error: In order to switch the "dialect" you have several options. One of them, the closest to the code is prepending the query with a comment line like so # standardSQL (docs). But
What you expected to happen:
There's no way to pass information to the hock that the SqlSensor is using underneath. It would be interesting to add a hook_params to the SqlSensor to be able to config the underlaying hooks:
# airflow/sensors/sql.py
class SqlSensor(BaseSensorOperator):
def __init__(
self, *, conn_id, sql, hook_config: Dict, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
):
self.conn_id = conn_id
# init all the params...
self.hook_config = hook_config or {}
super().__init__(**kwargs)
def _get_hook(self):
conn = BaseHook.get_connection(self.conn_id)
# ...
return conn.get_hook(**self.hook_config)And in the connection
# airflow/models/connection.py
class Connection(Base, LoggingMixin):
# ...
def get_hook(self, **kwargs):
"""Return hook based on conn_type."""
# locate hook class ...
return hook_class(**{conn_id_param: self.conn_id}, **kwargs)