Skip to content

DatabricksSQLOperator ValueError “too many values to unpack” #27963

@BillCM

Description

@BillCM

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-apache-hive==4.0.1
apache-airflow-providers-apache-livy==3.1.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-databricks==3.3.0
apache-airflow-providers-dbt-cloud==2.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-microsoft-azure==4.3.0
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-snowflake==3.3.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.2.0

databricks-sql-connector==2.0.2

Apache Airflow version

2.4.3

Operating System

AKSUbuntu-1804gen2containerd-2022.10.03containerd://1.6.4+azure-4

Deployment

Astronomer

Deployment details

Astronomer managed instances on Azure:

  • Standard_D4d_v5
  • Ubuntu Linux
  • Azure Database for Postgres
  • Databricks accessed through VNet Peering
  • Databricks SQL Warehouse version is (v 2022.35)

What happened

  • I’ve verified that my Databricks connection is defined correctly
    • I created a separate Python script using the databricks-sql-connector to double check the ability to read data to my local machine and it works correctly.
  • I can see the queries being received in Databricks from “PyDatabricksSqlConnector 2.0.2”
  • I can see Databricks responding with the expected query results in Databricks
    • 1000 rows fetched by client in 636ms
  • When the results get to DatabricksSQLOperator.execute() method I’m getting a consistent error on line 164 of databricks_sql.py
[2022-11-22, 17:07:32 UTC] {databricks_sql.py:161} INFO - Executing: select * from schema.tokenization_input_1000
[2022-11-22, 17:07:32 UTC] {base.py:71} INFO - Using connection ID 'tokenization_databricks' for task execution.
[2022-11-22, 17:07:32 UTC] {databricks_base.py:430} INFO - Using token auth.
[2022-11-22, 17:07:33 UTC] {databricks_base.py:430} INFO - Using token auth.
[2022-11-22, 17:07:34 UTC] {client.py:115} INFO - Successfully opened session b'\x01\xedj\x88(\xb7\x10\xd7\xa2\x16\xf2\t\x0e\xb4\xd9\xe3'
[2022-11-22, 17:07:34 UTC] {sql.py:315} INFO - Running statement: select * from schema.tokenization_input_1000, parameters: None
[2022-11-22, 17:07:37 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_sql.py", line 164, in execute
    schema, results = cast(List[Tuple[Any, Any]], response)[0]
ValueError: too many values to unpack (expected 2)

What you think should happen instead

Given that the data appears to be returned from Databricks correctly, I would expect DatabricksSQLOperator/Hook to handle the output from databricks-sql-connector correctly and the data written to the temp file on the Airflow node.

How to reproduce

Discussion thread: no response

My Code: Execute a simple select statement in Databricks SQL Warehouse and write the output to a CSV file in tmp file storage.

 select_into_file = DatabricksSqlOperator(
        databricks_conn_id=tokenization_databricks,
        sql_endpoint_name=sql_endpoint_name,
        task_id='extract_tokens_to_file',
        sql="select * from schema.tokenization_input_1000",
        output_path="/tmp/extracted_tokens.csv",
        output_format="csv"
    )

Anything else

I converted to using the DatabricksHook and did some testing locally. I think that the issues is the inclusion of the [0] at the end of line 164.

Current code throwing cast errors:
schema, results = cast(List[Tuple[Any, Any]], response)[0]

No error thrown and Tuple of Rows:
schema, results = cast(List[Tuple[Any, Any]], response)
output:
[Row(contributorID=1, datasetID=1, ...

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions