Skip to content

closing connection chunks in DbApiHook.get_pandas_df #22947

@bauerfranz

Description

@bauerfranz

Apache Airflow version

2.2.5 (latest released)

What happened

Hi all,
Please be patient with me, it's my first Bugreport in git at all :)

Affected function: DbApiHook.get_pandas_df

Short description: If I use DbApiHook.get_pandas_df with parameter "chunksize" the connection is lost

Error description
I tried using the DbApiHook.get_pandas_df function instead of pandas.read_sql. Without the parameter "chunksize" both functions work the same. But as soon as I add the parameter chunksize to get_pandas_df, I lose the connection in the first iteration. This happens both when querying Oracle and Mysql (Mariadb) databases.

During my research I found a comment on a closed issue that describes the same -> #8468

My Airflow version: 2.2.5

I think it's something to do with the "with closing" argument, because when I remove that argument, the chunksize argument was working.

def get_pandas_df(self, sql, parameters=None, **kwargs):
        """
        Executes the sql and returns a pandas dataframe
        :param sql: the sql statement to be executed (str) or a list of
            sql statements to execute
        :param parameters: The parameters to render the SQL query with.
        :param kwargs: (optional) passed into pandas.io.sql.read_sql method
        """
        try:
            from pandas.io import sql as psql
        except ImportError:
            raise Exception("pandas library not installed, run: pip install 'apache-airflow[pandas]'.")
       # Not working
        with closing(self.get_conn()) as conn:
                return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
       # would working
       # return psql.read_sql(sql, con=conn, params=parameters, **kwargs)_

What you think should happen instead

It should give me a chunk of DataFrame

How to reproduce

not working

src_hook = OracleHook(oracle_conn_id='oracle_source_conn_id')
query = "select * from example_table" 
for chunk in src_hook.get_pandas_df(query,chunksize=2):
    print(chunk.head())

works

for chunk in src_hook.get_pandas_df(query):
    print(chunk.head())

works

for chunk in pandas.read_sql(query,src_hook.get_conn(),chunksize=2):
    print(chunk.head())

Operating System

MacOS Monetäre

Versions of Apache Airflow Providers

apache-airflow 2.2.5
apache-airflow-providers-ftp 2.1.2
apache-airflow-providers-http 2.1.2
apache-airflow-providers-imap 2.2.3
apache-airflow-providers-microsoft-mssql 2.1.3
apache-airflow-providers-mongo 2.3.3
apache-airflow-providers-mysql 2.2.3
apache-airflow-providers-oracle 2.2.3
apache-airflow-providers-salesforce 3.4.3
apache-airflow-providers-sftp 2.5.2
apache-airflow-providers-sqlite 2.1.3
apache-airflow-providers-ssh 2.4.3

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions