Apache Airflow SqlSensor: A Comprehensive Guide

Apache Airflow is a leading open-source platform for orchestrating workflows, and the SqlSensor is a specialized operator designed to monitor SQL-based conditions within your Directed Acyclic Graphs (DAGs). Whether you’re waiting for database updates, checking data readiness, or integrating with operators like BashOperator, PythonOperator, or systems such as Airflow with Apache Spark, this sensor provides a seamless way to manage database-driven dependencies. This comprehensive guide explores the SqlSensor—its purpose, setup process, key features, and best practices for effective use in your workflows. We’ll provide step-by-step instructions where processes are involved and include practical examples to illustrate each concept clearly. If you’re new to Airflow, begin with Airflow Fundamentals, and pair this with Defining DAGs in Python for context.


Understanding the SqlSensor in Apache Airflow

The SqlSensor is an Airflow operator designed to monitor database conditions as tasks within your DAGs—those Python scripts that define your workflows (Introduction to DAGs in Airflow). Located in airflow.sensors.sql, it periodically executes a SQL query against a database—such as PostgreSQL, MySQL, or SQLite—using a connection specified via conn_id, and waits until a defined condition (e.g., rows returned) is met before allowing downstream tasks to proceed. You configure it with parameters like sql, conn_id, success, and poke_interval. Airflow’s Scheduler manages its execution timing (Airflow Architecture (Scheduler, Webserver, Executor)), and the Executor performs the SQL checks using the appropriate SQL Hook (Airflow Executors (Sequential, Local, Celery)), logging each attempt (Task Logging and Monitoring). It serves as a database condition monitor, integrating Airflow with SQL-based triggers for workflow synchronization.


Key Parameters of the SqlSensor

The SqlSensor relies on several critical parameters to configure and monitor SQL conditions effectively. Here’s an overview of the most important ones:

  • sql: Specifies the SQL query to execute—e.g., sql="SELECT COUNT(*) FROM employees WHERE status = 'ready'"—defining the condition to check, supporting Jinja templating—e.g., "SELECT * FROM data WHERE date = '{ { ds } }'".
  • conn_id: Identifies the database connection—e.g., conn_id="postgres_default"—linking to credentials and connection details in Airflow’s connection store (e.g., PostgreSQL, MySQL).
  • success: A Python callable—e.g., success=lambda result: result[0][0] > 0—validates the query result, returning True to succeed or False to continue polling (default: rows returned), offering custom success criteria.
  • parameters: A dictionary of parameters—e.g., parameters={"status": "ready"}—used to parameterize the SQL query—e.g., sql="SELECT COUNT(*) FROM employees WHERE status = %(status)s", enhancing security and flexibility.
  • poke_interval: Sets the polling interval in seconds—e.g., poke_interval=60—determining how often the sensor queries the database (default: 30), balancing responsiveness and database load.
  • timeout: Defines the maximum wait time in seconds—e.g., timeout=3600 (1 hour)—after which the task fails if the condition isn’t met (default: 7 days), preventing indefinite waits.
  • mode: Controls polling behavior—e.g., mode="poke" (default) or mode="reschedule"—where poke keeps the worker busy, and reschedule frees it between checks (default: poke).
  • retries: Sets the number of retry attempts—e.g., retries=3—for failed checks, improving resilience against transient database issues.
  • retry_delay: Defines the delay between retries—e.g., retry_delay=timedelta(minutes=5)—controlling the timing of retry attempts.

These parameters enable the SqlSensor to monitor database conditions with precision, integrating SQL-based triggers into your Airflow workflows efficiently.


How the SqlSensor Functions in Airflow

The SqlSensor operates by embedding a SQL monitoring task in your DAG script, saved in ~/airflow/dags (DAG File Structure Best Practices). You define it with parameters like sql="SELECT COUNT(*) FROM employees WHERE status = 'ready'", conn_id="postgres_default", poke_interval=60, and success=lambda result: result[0][0] > 0. The Scheduler scans this script and queues the task according to its schedule_interval, such as daily or hourly runs (DAG Scheduling (Cron, Timetables)), while respecting any upstream dependencies—e.g., waiting for a prior task to complete. When executed, the Executor uses the appropriate SQL Hook (e.g., PostgresHook) to connect to the database via conn_id, executes the sql query with parameters every poke_interval seconds, and evaluates the result with success until True is returned or timeout is reached, logging each attempt in Airflow’s metadata database for tracking and serialization (DAG Serialization in Airflow). Success occurs when the condition is met; failure—due to timeout or persistent condition failure—triggers retries or UI alerts (Airflow Graph View Explained). This process integrates SQL condition monitoring into Airflow’s orchestrated environment, automating database-driven triggers.


Setting Up the SqlSensor in Apache Airflow

To utilize the SqlSensor, you need to configure Airflow with a database connection and define it in a DAG. Here’s a step-by-step guide using a local PostgreSQL setup for demonstration purposes.

Step 1: Configure Airflow and PostgreSQL Connection

  1. Install Apache Airflow with PostgreSQL Support: Open your terminal, type cd ~, press Enter, then python -m venv airflow_env to create a virtual environment—isolating dependencies. Activate it with source airflow_env/bin/activate (Mac/Linux) or airflow_env\Scripts\activate (Windows), then press Enter—your prompt will show (airflow_env). Install Airflow and the PostgreSQL provider by typing pip install apache-airflow[postgres]—this includes psycopg2-binary for PostgreSQL connectivity.
  2. Set Up PostgreSQL: Install PostgreSQL—e.g., sudo apt install postgresql postgresql-contrib (Linux), brew install postgresql (macOS), or download from postgresql.org (Windows). Start it—sudo service postgresql start (Linux), brew services start postgresql (macOS), or auto-start on Windows. Create a test database—type psql -U postgres, enter your password (or blank if unset), then CREATE DATABASE airflow_test; \c airflow_test; CREATE TABLE employees (id SERIAL PRIMARY KEY, name VARCHAR(100), status VARCHAR(50)); INSERT INTO employees (name, status) VALUES ('Alice', 'pending');.
  3. Initialize Airflow: Type airflow db init and press Enter—creates ~/airflow/airflow.db and dags.
  4. Start Airflow Services: In one terminal, activate, type airflow webserver -p 8080, and press Enter—starts the UI at localhost:8080. In another, activate, type airflow scheduler, and press Enter—runs the Scheduler.
  5. Add PostgreSQL Connection: Go to localhost:8080, log in (admin/admin), click “Admin” > “Connections,” then “+”:
  • Conn Id: postgres_test—unique identifier.
  • Conn Type: Postgres—select from dropdown.
  • Host: localhost—PostgreSQL server address.
  • Schema: airflow_test—database name.
  • Login: postgres—default user (adjust if different).
  • Password: Your PostgreSQL password (or blank if unset).
  • Port: 5432—default PostgreSQL port.
  • Click “Save” Airflow Configuration Options.

Step 2: Create a DAG with SqlSensor

  1. Open a Text Editor: Use Notepad, Visual Studio Code, or any editor that saves .py files—ensuring compatibility with Airflow’s Python environment.
  2. Write the DAG: Define a DAG that uses the SqlSensor to wait for a database condition:
  • Paste the following code:
from airflow import DAG
from airflow.sensors.sql import SqlSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="sql_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    wait_for_data = SqlSensor(
        task_id="wait_for_data",
        conn_id="postgres_test",
        sql="SELECT COUNT(*) FROM employees WHERE status = 'ready';",
        success=lambda result: result[0][0] > 0,
        poke_interval=60,  # Check every 60 seconds
        timeout=3600,      # Fail after 1 hour
    )
    process = BashOperator(
        task_id="process",
        bash_command="echo 'Data is ready!'",
    )
    wait_for_data >> process
  • Save this as sql_sensor_dag.py in ~/airflow/dags—e.g., /home/username/airflow/dags/sql_sensor_dag.py on Linux/macOS or C:/Users/YourUsername/airflow/dags/sql_sensor_dag.py on Windows. This DAG waits until at least one employee has a status of 'ready'.

Step 3: Test and Execute the DAG

  1. Test with CLI: Activate your environment, type airflow dags test sql_sensor_dag 2025-04-07, and press Enter—this runs a dry test for April 7, 2025. Initially, with status='pending', the SqlSensor polls every 60 seconds, timing out after 1 hour—update the database (psql -U postgres -d airflow_test -c "UPDATE employees SET status = 'ready' WHERE name = 'Alice';") to succeed, logging “Data is ready!”—verify in logs (DAG Testing with Python).
  2. Run Live: Type airflow dags trigger -e 2025-04-07 sql_sensor_dag, press Enter—initiates live execution. Open your browser to localhost:8080, where “wait_for_data” stays yellow until the condition is met, then turns green, followed by “process”—check logs (Airflow Web UI Overview).

This setup demonstrates how the SqlSensor monitors a database condition, setting the stage for more complex SQL-based workflows.


Key Features of the SqlSensor

The SqlSensor offers several features that enhance its utility in Airflow workflows, each providing specific control over database condition monitoring.

Flexible SQL Condition Monitoring

The sql parameter defines the SQL query to monitor—e.g., sql="SELECT COUNT(*) FROM employees WHERE status = 'ready'"—allowing you to check any database condition, such as row counts, specific values, or table states. It supports Jinja templating—e.g., sql="SELECT * FROM data WHERE date = '{ { ds } }'"—enabling dynamic queries based on runtime variables, making it adaptable to workflows with variable data conditions.

Example: Dynamic SQL Condition

from airflow import DAG
from airflow.sensors.sql import SqlSensor
from datetime import datetime

with DAG(
    dag_id="dynamic_sql_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    wait_dynamic = SqlSensor(
        task_id="wait_dynamic",
        conn_id="postgres_test",
        sql="SELECT COUNT(*) FROM employees WHERE date = '{ { ds } }';",
        success=lambda result: result[0][0] > 0,
        poke_interval=60,
    )

This example waits for rows with the current execution date.

Custom Success Criteria

The success parameter—e.g., success=lambda result: result[0][0] > 0—defines a Python callable to validate the query result, returning True to succeed or False to continue polling. It allows custom conditions—e.g., result[0][0] == 5 for exactly 5 rows, or len(result) > 0 for any rows—offering precise control over when the sensor considers the condition met, beyond the default “rows returned” check.

Example: Custom Success Check

from airflow import DAG
from airflow.sensors.sql import SqlSensor
from datetime import datetime

def check_count(result):
    return result[0][0] >= 2  # Wait for at least 2 rows

with DAG(
    dag_id="custom_sql_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    wait_count = SqlSensor(
        task_id="wait_count",
        conn_id="postgres_test",
        sql="SELECT COUNT(*) FROM employees WHERE status = 'ready';",
        success=check_count,
        poke_interval=60,
    )

This example waits until at least 2 employees are 'ready'.

Configurable Polling Interval

The poke_interval parameter sets the frequency of SQL checks in seconds—e.g., poke_interval=60 for every minute (default: 30). This allows you to balance responsiveness—shorter intervals for quick detection—and database efficiency—longer intervals to reduce load—tailoring the sensor to your condition’s expected timing.

Example: Fast Polling Interval

from airflow import DAG
from airflow.sensors.sql import SqlSensor
from datetime import datetime

with DAG(
    dag_id="fast_poke_sql_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    wait_fast = SqlSensor(
        task_id="wait_fast",
        conn_id="postgres_test",
        sql="SELECT COUNT(*) FROM employees WHERE status = 'ready';",
        success=lambda result: result[0][0] > 0,
        poke_interval=10,  # Check every 10 seconds
    )

This example checks every 10 seconds for readiness.

Timeout and Mode Control

The timeout and mode parameters manage wait duration and resource usage—e.g., timeout=7200 (2 hours) sets the maximum wait, and mode="reschedule" (default: poke) frees the worker between checks. timeout prevents indefinite waits—e.g., if data never arrives—while mode="reschedule" optimizes long waits (e.g., hours) by rescheduling, contrasting with poke for short waits, ensuring efficient monitoring tailored to your needs.

Example: Reschedule Mode with Timeout

from airflow import DAG
from airflow.sensors.sql import SqlSensor
from datetime import datetime

with DAG(
    dag_id="reschedule_sql_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    wait_reschedule = SqlSensor(
        task_id="wait_reschedule",
        conn_id="postgres_test",
        sql="SELECT COUNT(*) FROM employees WHERE status = 'ready';",
        success=lambda result: result[0][0] > 0,
        poke_interval=300,  # 5 minutes
        timeout=7200,       # 2 hours
        mode="reschedule",
    )

This example reschedules checks every 5 minutes, failing after 2 hours.


Best Practices for Using the SqlSensor


Frequently Asked Questions About the SqlSensor

Here are common questions about the SqlSensor, with detailed, concise answers from online discussions.

1. Why does my SqlSensor fail with a connection error?

The conn_id—e.g., postgres_test—might be misconfigured. Check “Connections” UI—verify host, login—and ensure the database is running—test with airflow dags test (Task Logging and Monitoring).

2. How do I define a custom success condition?

Set success—e.g., success=lambda result: result[0][0] > 0—to check query results—e.g., row count > 0 (DAG Parameters and Defaults).

3. Can I monitor multiple conditions in one task?

No, one sql per sensor—e.g., sql="SELECT ...". Use multiple SqlSensor tasks—combine with TriggerRule (Airflow Trigger Rules).

4. Why does my SqlSensor timeout unexpectedly?

The timeout—e.g., timeout=300—might be too short. Increase it—e.g., timeout=3600—and test with airflow dags test (Task Timeouts and SLAs).

5. How can I debug a failed SqlSensor task?

Run airflow tasks test my_dag task_id 2025-04-07—logs results—e.g., “0 rows” (DAG Testing with Python). Check ~/airflow/logs—details like “Query failed” (Task Logging and Monitoring).

6. Is it possible to use the SqlSensor in dynamic DAGs?

Yes, use it in a loop—e.g., SqlSensor(task_id=f"sql_{i}", sql=f"SELECT * FROM table_{i}", ...)—each monitoring a unique condition (Dynamic DAG Generation).

7. How do I retry a failed SqlSensor task?

Set retries and retry_delay—e.g., retries=3, retry_delay=timedelta(minutes=5)—retries 3 times, waiting 5 minutes if it fails—e.g., database down (Task Retries and Retry Delays).


Conclusion

The SqlSensor enhances your Apache Airflow workflows with seamless SQL condition monitoring—build your DAGs with Defining DAGs in Python, install Airflow via Installing Airflow (Local, Docker, Cloud), and optimize performance with Airflow Performance Tuning. Monitor task execution in Monitoring Task Status in UI) and deepen your understanding with Airflow Concepts: DAGs, Tasks, and Workflows!