Skip to content

BaseSQLToGCSOperator parquet export format not limiting file size bug #25313

@dclandau

Description

@dclandau

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==6.8.0

Apache Airflow version

2.3.2

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Docker-Compose

Deployment details

No response

What happened

When using the PostgresToGCSOperator(..., export_format='parquet', approx_max_file_size_bytes=Y, ...), when a temporary file exceeds the size defined by Y, the current file is not yielded, and no new chunk is created. Meaning that only 1 chunk will be uploaded irregardless of the size specified Y.

I believe this line of code which is responsible for verifying whether the temporary file has exceeded its size, to be the culprit, considering the call to tmp_file_handle.tell() is always returning 0 after a parquet_writer.write_table(tbl) call [here].

Therefore, regardless of the size of the temporary file already being bigger than the defined approximate limit Y, no new file will be created and only a single chunk will be uploaded.

What you think should happen instead

This behaviour is erroneous as when the temporary file exceeds the size defined by Y, it should upload the current temporary file and then create a new file to upload after successfully uploading the current file to GCS.

A possible fix could be to use the import os package to determine the size of the temporary file with os.stat(tmp_file_handle).st_size, instead of using tmp_file_handle.tell().

How to reproduce

  1. Create a postgres connection on airflow with id postgres_test_conn.
  2. Create a gcp connection on airflow with id gcp_test_conn.
  3. In the database referenced by the postgres_test_conn, in the public schema create a table large_table, where the total amount of data In the table is big enough to exceed the 10MB limit defined in the approx_max_file_size_bytes parameter.
  4. Create a bucket named issue_BaseSQLToGCSOperator_bucket, in the gcp account referenced by the gcp_test_conn.
  5. Create the dag exemplified in the excerpt below, and manually trigger the dag to fetch all the data from large_table, to insert in the issue_BaseSQLToGCSOperator_bucket. We should expect multiple chunks to be created, but due to this bug, only 1 chunk will be uploaded with the whole data from large_table.
import pendulum

from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator


with DAG(
    dag_id="issue_BaseSQLToGCSOperator",
    start_date=pendulum.parse("2022-01-01"),
)as dag:
    task = PostgresToGCSOperator(
        task_id='extract_task',
        filename='uploading-{}.parquet',
        bucket="issue_BaseSQLToGCSOperator_bucket",
        export_format='parquet',
        approx_max_file_size_bytes=10_485_760,
        sql="SELECT * FROM large_table",
        postgres_conn_id='postgres_test_conn',
        gcp_conn_id='gcp_test_conn',
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions