-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-apache-hive==4.0.1
apache-airflow-providers-apache-livy==3.1.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-databricks==3.3.0
apache-airflow-providers-dbt-cloud==2.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-microsoft-azure==4.3.0
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-snowflake==3.3.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.2.0
databricks-sql-connector==2.0.2
Apache Airflow version
2.4.3
Operating System
AKSUbuntu-1804gen2containerd-2022.10.03containerd://1.6.4+azure-4
Deployment
Astronomer
Deployment details
Astronomer managed instances on Azure:
- Standard_D4d_v5
- Ubuntu Linux
- Azure Database for Postgres
- Databricks accessed through VNet Peering
- Databricks SQL Warehouse version is (v 2022.35)
What happened
- I’ve verified that my Databricks connection is defined correctly
- I created a separate Python script using the databricks-sql-connector to double check the ability to read data to my local machine and it works correctly.
- I can see the queries being received in Databricks from “PyDatabricksSqlConnector 2.0.2”
- I can see Databricks responding with the expected query results in Databricks
- 1000 rows fetched by client in 636ms
- When the results get to DatabricksSQLOperator.execute() method I’m getting a consistent error on line 164 of databricks_sql.py
[2022-11-22, 17:07:32 UTC] {databricks_sql.py:161} INFO - Executing: select * from schema.tokenization_input_1000
[2022-11-22, 17:07:32 UTC] {base.py:71} INFO - Using connection ID 'tokenization_databricks' for task execution.
[2022-11-22, 17:07:32 UTC] {databricks_base.py:430} INFO - Using token auth.
[2022-11-22, 17:07:33 UTC] {databricks_base.py:430} INFO - Using token auth.
[2022-11-22, 17:07:34 UTC] {client.py:115} INFO - Successfully opened session b'\x01\xedj\x88(\xb7\x10\xd7\xa2\x16\xf2\t\x0e\xb4\xd9\xe3'
[2022-11-22, 17:07:34 UTC] {sql.py:315} INFO - Running statement: select * from schema.tokenization_input_1000, parameters: None
[2022-11-22, 17:07:37 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_sql.py", line 164, in execute
schema, results = cast(List[Tuple[Any, Any]], response)[0]
ValueError: too many values to unpack (expected 2)
What you think should happen instead
Given that the data appears to be returned from Databricks correctly, I would expect DatabricksSQLOperator/Hook to handle the output from databricks-sql-connector correctly and the data written to the temp file on the Airflow node.
How to reproduce
Discussion thread: no response
My Code: Execute a simple select statement in Databricks SQL Warehouse and write the output to a CSV file in tmp file storage.
select_into_file = DatabricksSqlOperator(
databricks_conn_id=tokenization_databricks,
sql_endpoint_name=sql_endpoint_name,
task_id='extract_tokens_to_file',
sql="select * from schema.tokenization_input_1000",
output_path="/tmp/extracted_tokens.csv",
output_format="csv"
)
Anything else
I converted to using the DatabricksHook and did some testing locally. I think that the issues is the inclusion of the [0] at the end of line 164.
Current code throwing cast errors:
schema, results = cast(List[Tuple[Any, Any]], response)[0]
No error thrown and Tuple of Rows:
schema, results = cast(List[Tuple[Any, Any]], response)
output:
[Row(contributorID=1, datasetID=1, ...
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct