Skip to content

SftpSensor w/ possibility to use RegEx or fnmatch #15332

@saveriogzz

Description

@saveriogzz

Description

SmartSftpSensor with possibility to search for patterns (RegEx or UNIX fnmatch) in filenames or folders

Use case / motivation

I would like to have the possibility to use wildcards and/or regular expressions to look for certain files when using an SftpSensor.
At the moment I tried to do something like this:

from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from typing import Any

import os
import fnmatch

class SmartSftpSensor(SFTPSensor):
    poke_context_fields = ('path', 'filepattern', 'sftp_conn_id', ) # <- Required fields
    template_fields = ['filepattern', 'path']

    @apply_defaults
    def __init__(
            self, 
            filepattern="",
            **kwargs: Any):

        super().__init__(**kwargs)
        self.filepath = self.path
        self.filepattern = filepattern

    def poke(self, context):
        full_path = self.filepath

        directory = os.listdir(full_path)

        for file in directory:
            if not fnmatch.fnmatch(file, self.filepattern):
                pass
            else:
                context['task_instance'].xcom_push(key='file_name', value=file)
                return True
        return False

    def is_smart_sensor_compatible(self): # <- Required
        result = (
            not self.soft_fail
            and super().is_smart_sensor_compatible()
        )
        return result

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    operators = [SmartSftpSensor]

And I call it by doing

sense_file = SmartSftpSensor(
    task_id='sense_file',
    sftp_conn_id='my_sftp_connection',
    path=templ_remote_filepath,
    filepattern=filename,
    timeout=3
)

where path is the folder containing the files and filepattern is a rendered filename with wildcards: filename = """{{ execution_date.strftime("%y%m%d_%H00??_P??_???") }}.LV1""", which is rendered to e.g. 210412_1600??_P??_???.LV1

but I am still not getting the expected result, as it's not capturing anything.

Are you willing to submit a PR?
Yes!

Related Issues

I didn't find any

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions