-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==6.8.0
Apache Airflow version
2.3.2
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Docker-Compose
Deployment details
No response
What happened
When using the PostgresToGCSOperator(..., export_format='parquet', approx_max_file_size_bytes=Y, ...), when a temporary file exceeds the size defined by Y, the current file is not yielded, and no new chunk is created. Meaning that only 1 chunk will be uploaded irregardless of the size specified Y.
I believe this line of code which is responsible for verifying whether the temporary file has exceeded its size, to be the culprit, considering the call to tmp_file_handle.tell() is always returning 0 after a parquet_writer.write_table(tbl) call [here].
Therefore, regardless of the size of the temporary file already being bigger than the defined approximate limit Y, no new file will be created and only a single chunk will be uploaded.
What you think should happen instead
This behaviour is erroneous as when the temporary file exceeds the size defined by Y, it should upload the current temporary file and then create a new file to upload after successfully uploading the current file to GCS.
A possible fix could be to use the import os package to determine the size of the temporary file with os.stat(tmp_file_handle).st_size, instead of using tmp_file_handle.tell().
How to reproduce
- Create a postgres connection on airflow with id
postgres_test_conn. - Create a gcp connection on airflow with id
gcp_test_conn. - In the database referenced by the
postgres_test_conn, in the public schema create a tablelarge_table, where the total amount of data In the table is big enough to exceed the 10MB limit defined in theapprox_max_file_size_bytesparameter. - Create a bucket named
issue_BaseSQLToGCSOperator_bucket, in the gcp account referenced by thegcp_test_conn. - Create the dag exemplified in the excerpt below, and manually trigger the dag to fetch all the data from
large_table, to insert in theissue_BaseSQLToGCSOperator_bucket. We should expect multiple chunks to be created, but due to this bug, only 1 chunk will be uploaded with the whole data fromlarge_table.
import pendulum
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
with DAG(
dag_id="issue_BaseSQLToGCSOperator",
start_date=pendulum.parse("2022-01-01"),
)as dag:
task = PostgresToGCSOperator(
task_id='extract_task',
filename='uploading-{}.parquet',
bucket="issue_BaseSQLToGCSOperator_bucket",
export_format='parquet',
approx_max_file_size_bytes=10_485_760,
sql="SELECT * FROM large_table",
postgres_conn_id='postgres_test_conn',
gcp_conn_id='gcp_test_conn',
)Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct