Skip to content

GCSObjectsWithPrefixExistenceSensor in deferrable mode does not return matched objects in the xcom in the first poke (before entering deferrable mode) #37561

@hejnal

Description

@hejnal

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.14.0

Apache Airflow version

2.6.3

Operating System

MacOS

Deployment

Other

Deployment details

Local deployment of Airflow (with RabbitMQ queue). Connecting to GCS through custom connection with a SA.

What happened

On the first execution - before entering the deferrable mode, the sensor (in deferrable=True mode) the sensor picks up the files on GCS, turn green, but it does not return the object in the XCom interface.

It works fine if the file has arrived after the sensor entered in the deferrable mode and it is picked up by the triggerer.

The line that might cause this issue is this one:

self.poke(context=context)
../site-packages/airflow/providers/google/cloud/sensors/gcs.py

 if not self.poke(context=context):
            self.defer(
                timeout=timedelta(seconds=self.timeout),
                trigger=GCSUploadSessionTrigger(
                    bucket=self.bucket,
                    prefix=self.prefix,
                    poke_interval=self.poke_interval,
                    google_cloud_conn_id=self.google_cloud_conn_id,
                    inactivity_period=self.inactivity_period,
                    min_objects=self.min_objects,
                    previous_objects=self.previous_objects,
                    allow_delete=self.allow_delete,
                    hook_params=hook_params,
                ),
                method_name="execute_complete",
            )

It might be related to:

#30939
f89d7b9#diff-03d898c7c92dfd56502de1a64ff072fb8e88b57b3eac7cec7fb5ee14eb4b47d2

image

What you think should happen instead

No response

How to reproduce

Put the file in the bucket first.
Run the DAG code, on the first execution the sensor picks up the file, but does not return it in the Xcom.

wait_for_data_op = GCSObjectsWithPrefixExistenceSensor(
        task_id="00.wait_for_data_files",
        google_cloud_conn_id=GOOGLE_CLOUD_LANDING_BUCKET_CONN_ID,
        bucket=landing_bucket,
        prefix=f"{source_objects_prefix}{schema_version_suffix}" +
        ("" if not source_config.enforce_logical_date_detection else
         "_{{ ds_nodash }}"),
        deferrable=True,
    )

The downstream tasks fail because of that:
image

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions