Introduction
Automating file transfers is a crucial aspect of data engineering, ensuring seamless ETL workflows and secure data movement. Apache Airflow simplifies this process by orchestrating file transfers and processing tasks efficiently.
In this guide, we will walk through the process of setting up an automated file transfer pipeline using Airflow and SFTP. By the end of this tutorial, you’ll learn how to:
- Detect the availability of a file on an SFTP server.
- Download the file to a local directory.
- Process the file by adding a new column.
- Upload the transformed file to a different SFTP location.
This solution is highly valuable for organizations handling automated ETL workflows, ensuring data integrity and efficient processing.
Git Code:- Airflow/sftp_source_to_target.py at Main · AccentFuture-dev/Airflow · GitHub
Key Technologies Used
- Apache Airflow: A powerful open-source workflow orchestration tool for scheduling, managing, and monitoring data pipelines.
- SFTP (Secure File Transfer Protocol): A secure method for transferring files between systems over an encrypted connection.
- Python: The primary language for writing Airflow DAGs and processing data efficient
- CSV Processing: Used to manipulate structured data in workflows that involve file-based operations.
Pipeline Architecture:
The automated file processing pipeline consists of the following components:
- SFTP Server – Acts as both the source (where files arrive) and target (where processed files are uploaded).
- Airflow DAG (Directed Acyclic Graph) – Defines the workflow and task dependencies.
- Operators & Sensors – Used to detect file availability, transfer files, and process data.
- Logging & Monitoring – Ensures reliable execution with built-in logging and error-handling mechanisms.
Core Concepts in Airflow:
- DAG (Directed Acyclic Graph): Defines the sequence of tasks in a workflow.
- Tasks: Individual operations in the DAG, such as downloading, processing, or uploading files.
- Operators: Predefined Airflow components that execute specific tasks (e.g., Python Operator, Bash Operator).
- Sensors: Special waiting operators that monitor external triggers, like the availability of a file.
Hands-On: Implementing an Airflow-SFTP Pipeline
Let’s implement a real-world use case where we:
- Monitor an SFTP server for a new CSV file.
- Download the detected file to a local directory.
- Process the file by adding a new column.
- Upload the modified file back to the SFTP server.
Pipeline Overview:
Below is a high-level overview of the Airflow DAG that orchestrates the pipeline:
Step-by-Step Implementation:
- Step 1: SFTP Sensor: Use an SFTP sensor to detect when a new file arrives on the server.
- Step 2: File Download: Once detected, use an SFTP operator to transfer the file to a local directory.
- Step 3: Data Processing: Modify the CSV file by adding an additional column.
- Step 4: Upload the Processed File:Finally, upload the transformed file back to the SFTP destination.
Code Implementation:
Below is the complete Airflow DAG implementation:
from airflow.models import DAG
from datetime import timedelta,datetime
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.utils.dates import days_ago
import csv
from airflow.operators.python import PythonOperator
def def_process_data(**kwargs):
templates_dict = kwargs.get("templates_dict")
input_file = templates_dict.get("input_file")
output_file = templates_dict.get("output_file")
output_rows=[]
with open(input_file, newline='') as csv_file:
for row in csv.reader(csv_file):
row.append("processed")
output_rows.append(row)
with open(output_file,"w", newline='') as csv_file:
writer = csv.writer(csv_file)
writer.writerows(output_rows)
default_args = {
'owner':'venkat',
'start_date' : days_ago(0),
'email':['test123@gmail.com'],
'retries':1,
'retry_delay': timedelta(minutes=5),
}
with DAG('sftp_source_to_target', default_args = default_args, schedule_interval=None) as dag:
source_file_check = SFTPSensor(task_id='source_file_check',
sftp_conn_id='sftp_default',
path = '/Users/venkata/Desktop/desktop/Usecases/Airflow-docker/data/Source/input.csv',
poke_interval=43200
timeout=259200
)
get_data = SFTPOperator(task_id='get_data',
ssh_conn_id='sftp_default',
remote_filepath = "/Users/venkata/Desktop/desktop/Usecases/Airflow-docker/data/Source/input.csv",
local_filepath = "/opt/airflow/{{run_id}}/input.csv",
operation="get",
create_intermediate_dirs=True
)
process_data = PythonOperator(task_id="process_data",
templates_dict={
"input_file": "/opt/airflow/{{run_id}}/input.csv",
"output_file": "/opt/airflow/{{run_id}}/output.csv"
},
python_callable=def_process_data)
put_file = SFTPOperator(
task_id="put_file",
ssh_conn_id='sftp_default',
remote_filepath="/Users/venkata/Desktop/desktop/Usecases/Airflow-docker/data/Target/output.csv",
local_filepath="/opt/airflow/{{run_id}}/output.csv",
operation="put"
)
source_file_check >> get_data >> process_data >> put_file
Best Practices for Optimization:
- Error Handling – Implement proper logging and exception handling to debug failures efficiently.
- Optimized Polling – Reduce unnecessary checks by optimizing sensor intervals.
- Security Measures – Use encrypted SFTP connections and secure credential storage.
- Scalability – Design workflows to handle multiple files dynamically.
- Monitoring – Leverage Airflow logs and alert mechanisms to track execution.
Q&A:
This session provides an opportunity to:
- Discuss real-world implementation challenges.
- Explore alternative approaches to file automation.
- Gain insights into best practices for using Apache Airflow effectively.
Conclusion:
In this tutorial, we built an end-to-end automated pipeline using Apache Airflow and SFTP.
By leveraging workflow automation, organizations can simplify ETL workflows, ensure secure file transfers, and enhance efficiency in data processing.
With these fundamental skills, you can extend the pipeline further by integrating:
- Cloud storage solutions (AWS S3, Google Cloud Storage, Azure Blob Storage).
- Databases for real-time data ingestion.
- Advanced data transformations using Pandas or PySpark.