Apache Airflow FileSensor: A Comprehensive Guide
Apache Airflow is a leading open-source platform for orchestrating workflows, and the FileSensor is a powerful operator designed to monitor file system events within your Directed Acyclic Graphs (DAGs). Whether you’re waiting for data files to arrive, synchronizing workflows, or integrating with operators like BashOperator, PythonOperator, or systems such as Airflow with Apache Spark, this sensor provides a seamless way to manage file-based dependencies. This comprehensive guide explores the FileSensor—its purpose, setup process, key features, and best practices for effective use in your workflows. We’ll provide step-by-step instructions where processes are involved and include practical examples to illustrate each concept clearly. If you’re new to Airflow, begin with Airflow Fundamentals, and pair this with Defining DAGs in Python for context.
Understanding the FileSensor in Apache Airflow
The FileSensor is an Airflow operator designed to monitor the presence or absence of files or directories in the file system as a task within your DAGs—those Python scripts that define your workflows (Introduction to DAGs in Airflow). Located in airflow.sensors.filesystem, it periodically checks a specified path—e.g., /tmp/data.csv—using a connection defined via fs_conn_id, and waits until the file condition is met before allowing downstream tasks to proceed. You configure it with parameters like filepath, poke_interval, and timeout. Airflow’s Scheduler manages its execution timing (Airflow Architecture (Scheduler, Webserver, Executor)), and the Executor performs the file checks (Airflow Executors (Sequential, Local, Celery)), logging each attempt (Task Logging and Monitoring). It serves as a file system watchdog, integrating Airflow with file-based triggers for workflow synchronization.
Key Parameters of the FileSensor
The FileSensor relies on several critical parameters to configure and monitor file system events effectively. Here’s an overview of the most important ones:
- filepath: Specifies the absolute path to monitor—e.g., filepath="/tmp/data.csv"—defining the target file or directory to check, supporting Jinja templating—e.g., "/tmp/data_{ { ds } }.csv".
- fs_conn_id: Identifies the file system connection—e.g., fs_conn_id="fs_default"—linking to file system credentials if required (default: fs_default for local access).
- poke_interval: Sets the polling interval in seconds—e.g., poke_interval=60—determining how often the sensor checks the file’s presence (default: 60), balancing responsiveness and resource use.
- timeout: Defines the maximum wait time in seconds—e.g., timeout=3600 (1 hour)—after which the task fails if the condition isn’t met, preventing indefinite waits (default: 7 days).
- mode: Controls polling behavior—e.g., mode="poke" (default) or mode="reschedule"—where poke keeps the worker busy, and reschedule frees it between checks, optimizing resource usage (default: poke).
- recursive: A boolean—e.g., recursive=True—enabling recursive directory checks if filepath is a directory (default: False), useful for nested file structures.
- retries: Sets the number of retry attempts—e.g., retries=3—for failed checks, enhancing resilience against transient file system issues.
- retry_delay: Defines the delay between retries—e.g., retry_delay=timedelta(minutes=5)—controlling the timing of retry attempts.
These parameters enable the FileSensor to monitor file system events with precision, integrating file-based dependencies into your Airflow workflows efficiently.
How the FileSensor Functions in Airflow
The FileSensor operates by embedding a file-monitoring task in your DAG script, saved in ~/airflow/dags (DAG File Structure Best Practices). You define it with parameters like filepath="/tmp/data.csv", poke_interval=60, and timeout=3600. The Scheduler scans this script and queues the task according to its schedule_interval, such as daily or hourly runs (DAG Scheduling (Cron, Timetables)), while respecting any upstream dependencies—e.g., waiting for a prior task to complete. When executed, the Executor uses the FileSystem Hook to periodically check the filepath on the worker host (via fs_conn_id, defaulting to local), polling every poke_interval seconds until the file exists or the timeout is reached. It logs each check in Airflow’s metadata database for tracking and serialization (DAG Serialization in Airflow). Success occurs when the file is found; failure—due to timeout or persistent absence—triggers retries or UI alerts (Airflow Graph View Explained). This process integrates file system monitoring into Airflow’s orchestrated environment, automating file-based triggers with precision.
Setting Up the FileSensor in Apache Airflow
To utilize the FileSensor, you need to configure Airflow and define it in a DAG. Here’s a step-by-step guide using a local file system setup for demonstration purposes.
Step 1: Configure Your Airflow Environment
- Install Apache Airflow: Open your terminal, type cd ~, press Enter, then python -m venv airflow_env to create a virtual environment—isolating dependencies. Activate it with source airflow_env/bin/activate (Mac/Linux) or airflow_env\Scripts\activate (Windows), then press Enter—your prompt will show (airflow_env). Install Airflow by typing pip install apache-airflow—this includes the core package with FileSensor built-in.
- Initialize Airflow: Type airflow db init and press Enter—this creates ~/airflow/airflow.db and the dags folder, setting up the metadata database.
- Start Airflow Services: In one terminal, activate, type airflow webserver -p 8080, and press Enter—starts the UI at localhost:8080. In another, activate, type airflow scheduler, and press Enter—runs the Scheduler. Use the default LocalExecutor (airflow.cfg: executor = LocalExecutor)—no additional connections are needed for local file system checks.
Step 2: Create a DAG with FileSensor
- Open a Text Editor: Use Notepad, Visual Studio Code, or any editor that saves .py files—ensuring compatibility with Airflow’s Python environment.
- Write the DAG: Define a DAG that uses the FileSensor to wait for a file:
- Paste the following code:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="file_sensor_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/tmp/data.csv",
poke_interval=60, # Check every 60 seconds
timeout=3600, # Fail after 1 hour
)
process_file = BashOperator(
task_id="process_file",
bash_command="echo 'File found!'",
)
wait_for_file >> process_file
- Save this as file_sensor_dag.py in ~/airflow/dags—e.g., /home/username/airflow/dags/file_sensor_dag.py on Linux/macOS or C:/Users/YourUsername/airflow/dags/file_sensor_dag.py on Windows (adjust filepath to C:/Temp/data.csv for Windows). This DAG waits for /tmp/data.csv and echoes a message when found.
Step 3: Test and Execute the DAG
- Test with CLI: Activate your environment, type airflow dags test file_sensor_dag 2025-04-07, and press Enter—this runs a dry test for April 7, 2025. Without /tmp/data.csv, the FileSensor polls every 60 seconds, timing out after 1 hour—create the file (touch /tmp/data.csv on Linux/macOS or echo. > C:/Temp/data.csv on Windows) to succeed, logging “File found!”—verify in logs (DAG Testing with Python).
- Run Live: Type airflow dags trigger -e 2025-04-07 file_sensor_dag, press Enter—initiates live execution. Open your browser to localhost:8080, where “wait_for_file” stays yellow until the file appears, then turns green, followed by “process_file”—check logs (Airflow Web UI Overview).
This setup demonstrates how the FileSensor monitors a local file, setting the stage for more complex file-based workflows.
Key Features of the FileSensor
The FileSensor offers several features that enhance its utility in Airflow workflows, each providing specific control over file monitoring.
Flexible File Path Monitoring
The filepath parameter defines the file or directory to monitor—e.g., filepath="/tmp/data.csv" for a single file or filepath="/tmp/data/" for a directory (with recursive=True). It supports Jinja templating—e.g., "/tmp/data_{ { ds } }.csv"—allowing dynamic paths based on runtime variables like the execution date, making it adaptable to workflows with variable file names or locations.
Example: Dynamic File Path
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG(
dag_id="dynamic_file_sensor_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_dynamic = FileSensor(
task_id="wait_dynamic",
filepath="/tmp/data_{ { ds } }.csv",
poke_interval=60,
)
This example waits for a file named with the execution date—e.g., /tmp/data_2025-04-07.csv.
Configurable Polling Interval
The poke_interval parameter sets the frequency of file checks in seconds—e.g., poke_interval=30 for every 30 seconds (default: 60). This allows you to balance responsiveness—shorter intervals for quick detection—and resource efficiency—longer intervals to reduce worker load—tailoring the sensor to your workflow’s timing needs.
Example: Fast Polling
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG(
dag_id="fast_poke_sensor_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_fast = FileSensor(
task_id="wait_fast",
filepath="/tmp/data.csv",
poke_interval=10, # Check every 10 seconds
)
This example checks every 10 seconds, prioritizing quick detection.
Timeout Control
The timeout parameter defines the maximum wait time in seconds—e.g., timeout=7200 (2 hours)—after which the sensor fails if the file isn’t found (default: 7 days). This prevents indefinite waits—e.g., for missing files—ensuring workflows fail gracefully and alert operators, customizable to your expected file arrival window.
Example: Short Timeout
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG(
dag_id="timeout_sensor_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_timeout = FileSensor(
task_id="wait_timeout",
filepath="/tmp/data.csv",
poke_interval=60,
timeout=300, # 5 minutes
)
This example fails after 5 minutes if the file isn’t found.
Mode Selection for Resource Management
The mode parameter—e.g., mode="poke" (default) or mode="reschedule"—controls polling behavior. poke keeps the worker slot occupied, checking continuously—ideal for short waits—while reschedule frees the slot between checks, rescheduling itself—better for long waits with larger poke_interval—optimizing resource usage based on monitoring duration.
Example: Reschedule Mode
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG(
dag_id="reschedule_sensor_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_reschedule = FileSensor(
task_id="wait_reschedule",
filepath="/tmp/data.csv",
poke_interval=300, # 5 minutes
mode="reschedule",
)
This example reschedules checks every 5 minutes, freeing resources.
Best Practices for Using the FileSensor
- Use Absolute Paths: Set filepath with absolute paths—e.g., filepath="/tmp/data.csv"—to avoid ambiguity across workers Airflow Configuration Options.
- Optimize Poke Interval: Adjust poke_interval—e.g., poke_interval=60—to balance responsiveness and resource use Airflow Performance Tuning.
- Set Reasonable Timeouts: Define timeout—e.g., timeout=3600—to match expected file arrival times Task Timeouts and SLAs.
- Choose Mode Wisely: Use mode="poke" for short waits—e.g., minutes—or mode="reschedule" for longer waits—e.g., hours Airflow Executors (Sequential, Local, Celery).
- Test File Checks: Verify file paths—e.g., ls /tmp/data.csv—then test with airflow dags testDAG Testing with Python.
- Implement Retries: Configure retries=3—e.g., retries=3—to handle transient file system issues Task Retries and Retry Delays.
- Monitor Logs: Check ~/airflow/logs—e.g., “File not found yet”—to track progress Task Logging and Monitoring.
- Organize Sensor Tasks: Structure in a dedicated directory—e.g., ~/airflow/dags/sensors/—for clarity DAG File Structure Best Practices.
Frequently Asked Questions About the FileSensor
Here are common questions about the FileSensor, with detailed, concise answers from online discussions.
1. Why does my FileSensor fail to detect the file?
The filepath—e.g., /tmp/data.csv—might be inaccessible. Check permissions—e.g., ls -l /tmp—and ensure the path is correct—test with airflow dags test (Task Logging and Monitoring).
2. How do I adjust the polling frequency?
Set poke_interval—e.g., poke_interval=30—to check every 30 seconds; adjust based on needs—test with airflow dags test (DAG Parameters and Defaults).
3. Can I monitor multiple files in one task?
No, one filepath per sensor—e.g., filepath="/tmp/data.csv". Use multiple FileSensor tasks—combine with TriggerRule (Airflow Trigger Rules).
4. Why does my FileSensor timeout unexpectedly?
The timeout—e.g., timeout=300—might be too short. Increase it—e.g., timeout=3600—and test with airflow dags test (Task Timeouts and SLAs).
5. How can I debug a failed FileSensor task?
Run airflow tasks test my_dag task_id 2025-04-07—logs checks—e.g., “File not found” (DAG Testing with Python). Check ~/airflow/logs—details like “Timeout exceeded” (Task Logging and Monitoring).
6. Is it possible to use the FileSensor in dynamic DAGs?
Yes, use it in a loop—e.g., FileSensor(task_id=f"file_{i}", filepath=f"/tmp/data_{i}.csv", ...)—each monitoring a unique file (Dynamic DAG Generation).
7. How do I retry a failed FileSensor task?
Set retries and retry_delay—e.g., retries=3, retry_delay=timedelta(minutes=5)—retries 3 times, waiting 5 minutes if it fails—e.g., file system glitch (Task Retries and Retry Delays).
Conclusion
The FileSensor enhances your Apache Airflow workflows with seamless file system monitoring—build your DAGs with Defining DAGs in Python, install Airflow via Installing Airflow (Local, Docker, Cloud), and optimize performance with Airflow Performance Tuning. Monitor task execution in Monitoring Task Status in UI) and deepen your understanding with Airflow Concepts: DAGs, Tasks, and Workflows!