-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Allow execution of multiple sql statements in SnowflakeHook #11350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
65396aa to
23f85b4
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
| if isinstance(sql, str): | ||
| with closing(self.get_conn()) as conn: | ||
| if self.supports_autocommit: | ||
| self.set_autocommit(conn, autocommit) | ||
|
|
||
| conn.execute_string(sql, parameters) | ||
| else: | ||
| super().run(sql, autocommit, parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for this @JavierLopezT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kaxil . I think I have to test that depending on whether sql is a list or a string, it calls conn.execute_string or super().run, but I have no idea how to do that. I have started defining two functions and two different SQL, but I can't go beyond. Could you help me, please? Thank you very much in advance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @JavierLopezT -- Have a look at https://github.com/apache/airflow/blob/master/tests/providers/exasol/hooks/test_exasol.py for some examples.
Let me know if you need help though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JavierLopezT what you want to do is use mock.patch to verify that in the run method, the right underlying method is called given the right input. google for basic examples of mock patch.
in this case you would want to mock the get_conn method so that its return_value is a mock object. then after run finishes, you can verify that the mock object was called with method execute_string and your value for sql
there are lots of examples in the repo just try to find something similar
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
dstandish
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a worthwhile change @JavierLopezT 👍
| if isinstance(sql, str): | ||
| with closing(self.get_conn()) as conn: | ||
| if self.supports_autocommit: | ||
| self.set_autocommit(conn, autocommit) | ||
|
|
||
| conn.execute_string(sql, parameters) | ||
| else: | ||
| super().run(sql, autocommit, parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JavierLopezT what you want to do is use mock.patch to verify that in the run method, the right underlying method is called given the right input. google for basic examples of mock patch.
in this case you would want to mock the get_conn method so that its return_value is a mock object. then after run finishes, you can verify that the mock object was called with method execute_string and your value for sql
there are lots of examples in the repo just try to find something similar
|
@potiuk Hello! Is it worth it to keep with this MR or shall we wait for a new version of snowflake python connector? |
Certainly. I am not owning the connector :). Not sure when/if the new version comes out. And when it does - i think it will be backwards compatible, so there is no reason why this should be blocking anyone. |
|
|
||
| queries = [item[0] for item in split_statements(StringIO(sql))] | ||
| for query in queries: | ||
| super().run(query, autocommit, parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem with this @JavierLopezT is it will reconnect for every statement.
this can be a nightmare when you use okta :) (though it's possible to cache the creds)
but more importantly, the way you've implemented, it's not possible to use temp tables (which is pretty important). after reconnect the table will be gone.
instead you should connect only once and use same connection for the whole series of statements.
e.g.
with closing(hook.get_conn()) as cnx:
cur = cnx.cursor()
for query in queries:
cur.execute(query)
for row in cur.fetchall():
print(row)
(you want to print fetchall because this is how you get statement results, and operator is not meant to be used for a bare "select" statement so no worry about writing a billion rows to the log :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the example. However, I don't get why would we want to print fetchall
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After you execute a statement snowflake returns information
for example it might say rows affected, or after a copy statement, the result of the load for all files
Log should print it
i also recomment using DictCursor if you're gonna print out (it's a snowflake class)
|
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease. |
| def run(self, sql, autocommit=False, parameters=None): | ||
| """ | ||
| Snowflake-connector doesn't allow natively the execution of multiple SQL statements in the same | ||
| call. So for allowing to pass files or strings with several queries this method is coded, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JavierLopezT does this actually support running a file? It looks like it must be string here.
I think we def should make it so sql can be path or sql (i.e. Union[Path,str], and if str, check if it's a path to a file that exists), though it doesn't have to be this PR -- I just want to suggest here you make sure the docstring is consistent here with the behavior.
Sorry missed this before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and small nit, that relies on run from DBApiHook no longer true
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
Co-authored-by: Kaxil Naik <[email protected]>
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from contextlib import closing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static check fails with:
airflow/providers/snowflake/hooks/snowflake.py:18:1: F401 'contextlib.closing' imported but unused
|
Closing it to open a new open with more features as suggestions here |
|
Any reason why this one needs to be abandoned? I can imagine circumstances where you just want to start over and go in a different direction. I'm not sure what the conventions are and maybe it doesn't matter. But I think in general it's good to have the history for a PR contained in one place. Anyway just a thought |
Snowflake-connector doesn't allow natively the execution of multiple SQL statements in the same call. So for allowing to pass files or strings with several queries, a new method is included in the hook
Does this require tests?