Skip to content

PostgresToGCSOperator parquet format mapping inconsistencies converts boolean data type to string #25474

@dclandau

Description

@dclandau

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==6.8.0

Apache Airflow version

2.3.2

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Docker-Compose

Deployment details

No response

What happened

When converting postgres native data type to bigquery data types, this function is responsible for converting from postgres types -> bigquery types -> parquet types.

The map in the PostgresToGCSOperator indicates that the postgres boolean type matches to the bigquery BOOLEAN data type.

Then when converting from bigquery to parquet data types here, the map does not have the BOOLEAN data type in its keys. Because the type defaults to string in the following line, the BOOLEAN data type is converted into string, which then fails when converting the data into pa.bool_().

When converting the boolean data type into pa.string() pyarrow raises an error.

What you think should happen instead

I would expect the postgres boolean type to map to pa.bool_() data type.

Changing the map to include the BOOL key instead of BOOLEAN would correctly map the postgres type to the final parquet type.

How to reproduce

  1. Create a postgres connection on airflow with id postgres_test_conn.
  2. Create a gcp connection on airflow with id gcp_test_conn.
  3. In the database referenced by the postgres_test_conn, in the public schema create a table test_table that includes a boolean data type, and insert data into the table.
  4. Create a bucket named issue_PostgresToGCSOperator_bucket, in the gcp account referenced by the gcp_test_conn.
  5. Run the dag below that inserts the data from the postgres table into the cloud storage bucket.
import pendulum

from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator


with DAG(
    dag_id="issue_PostgresToGCSOperator",
    start_date=pendulum.parse("2022-01-01"),
)as dag:
    task = PostgresToGCSOperator(
        task_id='extract_task',
        filename='uploading-{}.parquet',
        bucket="issue_PostgresToGCSOperator_bucket",
        export_format='parquet',
        sql="SELECT * FROM test_table",
        postgres_conn_id='postgres_test_conn',
        gcp_conn_id='gcp_test_conn',
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions