-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
Description
Description
SmartSftpSensor with possibility to search for patterns (RegEx or UNIX fnmatch) in filenames or folders
Use case / motivation
I would like to have the possibility to use wildcards and/or regular expressions to look for certain files when using an SftpSensor.
At the moment I tried to do something like this:
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from typing import Any
import os
import fnmatch
class SmartSftpSensor(SFTPSensor):
poke_context_fields = ('path', 'filepattern', 'sftp_conn_id', ) # <- Required fields
template_fields = ['filepattern', 'path']
@apply_defaults
def __init__(
self,
filepattern="",
**kwargs: Any):
super().__init__(**kwargs)
self.filepath = self.path
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
directory = os.listdir(full_path)
for file in directory:
if not fnmatch.fnmatch(file, self.filepattern):
pass
else:
context['task_instance'].xcom_push(key='file_name', value=file)
return True
return False
def is_smart_sensor_compatible(self): # <- Required
result = (
not self.soft_fail
and super().is_smart_sensor_compatible()
)
return result
class MyPlugin(AirflowPlugin):
name = "my_plugin"
operators = [SmartSftpSensor]And I call it by doing
sense_file = SmartSftpSensor(
task_id='sense_file',
sftp_conn_id='my_sftp_connection',
path=templ_remote_filepath,
filepattern=filename,
timeout=3
)where path is the folder containing the files and filepattern is a rendered filename with wildcards: filename = """{{ execution_date.strftime("%y%m%d_%H00??_P??_???") }}.LV1""", which is rendered to e.g. 210412_1600??_P??_???.LV1
but I am still not getting the expected result, as it's not capturing anything.
Are you willing to submit a PR?
Yes!
Related Issues
I didn't find any