-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Body
TL;DR:
Multiple providers need to handle the logic of splitting SQL statements
Snowflake.run() , RedShift.run() and it will be very similar should be eventually create PrestoOperator or TrinoOperator.
I want to discuss the idea of extracting the split statement logic into DbApiHook.
e.g create a DbApiHook.run_many() that will split the statements and use DbApiHook.run() under the hood.
On one hand it will reduce duplicate code and allow to develop operators for these providers more easily
On the other hand if we will make it in dbapihook it means to bump minimum supported Airflow version for these providers (once they will actually use the new function)
The long story:
Some DBs like MySQL, PostgreSql, MsSQL are able to run a batch of SQL statements:
SELECT 1; SELECT2;
So you can do:
MySQLOperator(.., sql='SELECT 1; SELECT2;')
it will invoke :
airflow/airflow/hooks/dbapi.py
Line 163 in 501a3c3
| def run(self, sql, autocommit=False, parameters=None, handler=None): |
which will execute the sql in a single query. (The run function will convert this sql to ['SELECT 1; SELECT2;'] - Note this is a list with 1 cell)
But in other DBs like: Snowflake, Redshift, Trino, Presto
You can not run SELECT 1; SELECT2; as is. You must split the statement.
This makes the run() irrelevant for these DBs.
For example in Snowflake when users pass SELECT 1; SELECT2; what we actually do is create:
['SELECT 1;', 'SELECT2;'] - Note this is a list with 2 cells
airflow/airflow/providers/snowflake/hooks/snowflake.py
Lines 314 to 316 in 501a3c3
| if isinstance(sql, str): | |
| split_statements_tuple = split_statements(StringIO(sql)) | |
| sql = [sql_string for sql_string, _ in split_statements_tuple if sql_string] |
The problem?
These providers (Snowflake, Redshift, Trino, Presto) are forced to override dbapi.run() but they all actually need the exact same functionality.
If you will take a look at Snowflake.run() you will see that it's almost identical to RedShift.run() and it will be very similar should be eventually create PrestoOperator or TrinoOperator.
This is something already brought up earlier in #15533 (comment)
So why not just set the sql param to accept list only?
Because we want to support using .sql file so we must handle the split of the statements.
SomeSqlOperator(
...,
sql=my_queries.sql
)
We are reading the content of the file and loading it as a single statement.
What do others think?
Should we create DbApiHook.run_many() or leave it as it is today where every provider needs to handle the logic on it's own?
Committer
- I acknowledge that I am a maintainer/committer of the Apache Airflow project.