SFTP File Watcher
In this article let’s discuss about a use-case where the need is to trigger the ingestion pipeline for the data files that land in the SFTP server at random times. As the file landing time in random it is not possible to use conventional schedulers like cron, Autosys etc.
I checked if it is feasible to use any existing solution like databricks autoloader or file sensors in airflow. The problem can definitely be solved using both the solutions but comes with an additional overhead of deployment of infrastructure (databricks or airflow), maintenance (maintaining metadata about which files are ingested incase of airflow) and the cost.
The use case that we had at hand was for 10k tables and there can be maximum of 1–2 files landing in the SFTP server daily/weekly/monthly for each table. Also there was no strict SLA and that eliminates the need for triggering the ingestion as soon as the file lands.
Architecture of the proposed solution
- The watchdog process runs in the SFTP server and continuously monitors for any new file created/modified.
- As soon as there are any new files detected, the information is pushed to the Azure Storage Queue or AWS SQS or any open-source queue implementations (Redis, Rabbitmq, Kafka)
- Another script checks for any new messages in the queue and triggers ingestion for the corresponding table. This script can either be run as a lambda function/cloud function/azure function or as a standalone script.
Watchdog Process
watchdog is an open-source python API library that is a cross-platform API to monitor file system events.
In-order to track the files that are created or modified in certain directory we can use python’s watchdog module.
This module checks the given directory and can notify if a file is created or changed.
To install watchdog run this command in the terminal.
pip install watchdog
We can run the watchdog script in the SFTP server to monitor for any new events (file additions/modifications) and configure the script push those events to the queue.
The watchdog script does the following,
- Initiates the watchdog observer that will monitor for any events
- Checks for file completeness. Make sure to send the message to the queue only if the file is closed.
- Write the events/messages to the queue.
Refer the following links for more details on Watchdog module,
- https://pypi.org/project/watchdog/
- https://medium.com/analytics-vidhya/monitoring-your-file-system-using-watchdog-64f7ad3279f
Queue
In this use-case the queue is used to store the messages/events about the files that have been created/modified in the SFTP server. As soon as the file is created or modified the watchdog script pushes the message to the queue. I have tested this against the AWS SQS and Azure Storage Queue. Both version of the codes are tested and are available in the Github repo.
For testing this locally I would recommend using localstack.
Localstack is a cloud service emulator that runs in a single container on your laptop or in your CI environment. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider
Install localstack in your laptop/VM and create a queue for testing as shown below,
pip install localstack
pip install awscli-local
localstack start -dawslocal sqs create-queue --queue-name demo-queue --> ( This command creates a queue named demo-queue )
Reference: https://github.com/localstack/localstack
Reading Messages from Queue
There will be another script that checks for any new messages/events in the queue (SQS or Storage Queue) and triggers the ingestion data pipeline for the corresponding table. This script can either be run as a lambda function/cloud function/azure function or as a standalone script.
The ingestion pipeline can then,
- Connect to SFTP server
- Copy/Process the file from SFTP server whose information was read from the queue and ingest it into the target datalake
- The ingestion pipeline can be simple bash/python script, any ETL pipeline or airflow pipeline.
The entire codebase can be found here https://github.com/abhr1994/SFTPFileWatcher
This article is my personal view/solution on this use-case and I would love to hear feedbacks, comments, improvements or any other easier/interesting way to solve this problem :-)