Apache Airflow HttpSensor: A Comprehensive Guide

Apache Airflow is a leading open-source platform for orchestrating workflows, and the HttpSensor is a specialized operator designed to monitor HTTP endpoints within your Directed Acyclic Graphs (DAGs). Whether you’re waiting for API responses, checking service availability, or integrating with operators like BashOperator, PythonOperator, or systems such as Airflow with Apache Spark, this sensor provides a seamless way to manage HTTP-based dependencies. This comprehensive guide explores the HttpSensor—its purpose, setup process, key features, and best practices for effective use in your workflows. We’ll provide step-by-step instructions where processes are involved and include practical examples to illustrate each concept clearly. If you’re new to Airflow, begin with Airflow Fundamentals, and pair this with Defining DAGs in Python for context.


Understanding the HttpSensor in Apache Airflow

The HttpSensor is an Airflow operator designed to monitor HTTP endpoints as tasks within your DAGs—those Python scripts that define your workflows (Introduction to DAGs in Airflow). Located in airflow.sensors.http, it periodically sends HTTP requests—typically GET—to a specified endpoint, using a connection defined via http_conn_id, and waits until a defined condition (e.g., status code 200) is met before allowing downstream tasks to proceed. You configure it with parameters like endpoint, http_conn_id, method, and response_check. Airflow’s Scheduler manages its execution timing (Airflow Architecture (Scheduler, Webserver, Executor)), and the Executor performs the HTTP checks using the HTTP Hook (Airflow Executors (Sequential, Local, Celery)), logging each attempt (Task Logging and Monitoring). It serves as an HTTP watchdog, integrating Airflow with external web services for conditional workflow progression.


Key Parameters of the HttpSensor

The HttpSensor relies on several critical parameters to configure and monitor HTTP endpoints effectively. Here’s an overview of the most important ones:

  • endpoint: Specifies the URL path to monitor—e.g., endpoint="/status"—appended to the base URL from the connection, defining the target resource (supports Jinja templating—e.g., "/status/{ { ds } }").
  • http_conn_id: Identifies the HTTP connection—e.g., http_conn_id="http_default"—linking to the base URL and optional credentials in Airflow’s connection store (default: http_default).
  • method: Defines the HTTP method—e.g., method="GET"—indicating the request type (typically GET, but supports POST, etc.), controlling the action performed on the endpoint (default: GET).
  • response_check: A Python callable—e.g., response_check=lambda response: response.status_code == 200—validates the response, returning True for success or False to continue polling, defining custom success criteria.
  • poke_interval: Sets the polling interval in seconds—e.g., poke_interval=60—determining how often the sensor checks the endpoint (default: 30), balancing responsiveness and resource use.
  • timeout: Defines the maximum wait time in seconds—e.g., timeout=3600 (1 hour)—after which the task fails if the condition isn’t met (default: 7 days), preventing indefinite waits.
  • headers: Sets HTTP headers—e.g., headers={"Authorization": "Bearer token"}—specifying metadata like authentication tokens or content types, supporting templating—e.g., {"X-Date": "{ { ds } }"}.
  • request_params: A dictionary of query parameters—e.g., request_params={"key": "value"}—appended to the URL (e.g., ?key=value), supporting templating for dynamic queries.
  • mode: Controls polling behavior—e.g., mode="poke" (default) or mode="reschedule"—where poke keeps the worker busy, and reschedule frees it between checks (default: poke).

These parameters enable the HttpSensor to monitor HTTP endpoints with precision, integrating web-based triggers into your Airflow workflows efficiently.


How the HttpSensor Functions in Airflow

The HttpSensor operates by embedding an HTTP monitoring task in your DAG script, saved in ~/airflow/dags (DAG File Structure Best Practices). You define it with parameters like endpoint="/status", http_conn_id="http_default", poke_interval=60, and response_check=lambda response: response.status_code == 200. The Scheduler scans this script and queues the task according to its schedule_interval, such as daily or hourly runs (DAG Scheduling (Cron, Timetables)), while respecting any upstream dependencies—e.g., waiting for a prior task to complete. When executed, the Executor uses the HTTP Hook to connect to the base URL from http_conn_id (e.g., https://api.example.com), constructs the full URL by appending endpoint, and sends the request with method, headers, and request_params. It polls every poke_interval seconds, evaluating the response with response_check until True is returned or timeout is reached, logging each attempt in Airflow’s metadata database for tracking and serialization (DAG Serialization in Airflow). Success occurs when response_check passes; failure—due to timeout or persistent condition failure—triggers retries or UI alerts (Airflow Graph View Explained). This process integrates HTTP monitoring into Airflow’s orchestrated environment, automating web service condition checks.


Setting Up the HttpSensor in Apache Airflow

To utilize the HttpSensor, you need to configure Airflow with an HTTP connection and define it in a DAG. Here’s a step-by-step guide using a local setup with a public test API for demonstration purposes.

Step 1: Configure Airflow and HTTP Connection

  1. Install Apache Airflow: Open your terminal, type cd ~, press Enter, then python -m venv airflow_env to create a virtual environment—isolating dependencies. Activate it with source airflow_env/bin/activate (Mac/Linux) or airflow_env\Scripts\activate (Windows), then press Enter—your prompt will show (airflow_env). Install Airflow by typing pip install apache-airflow—this includes the core package with HttpSensor built-in.
  2. Initialize Airflow: Type airflow db init and press Enter—this creates ~/airflow/airflow.db and the dags folder, setting up the metadata database.
  3. Start Airflow Services: In one terminal, activate, type airflow webserver -p 8080, and press Enter—starts the UI at localhost:8080. In another, activate, type airflow scheduler, and press Enter—runs the Scheduler.
  4. Add HTTP Connection: Go to localhost:8080, log in (admin/admin), click “Admin” > “Connections,” then “+”:
  • Conn Id: http_default—unique identifier (default used if not overridden).
  • Conn Type: HTTP—select from dropdown.
  • Host: https://httpbin.org—base URL for a public test API (replace with your API’s base URL in practice).
  • Login: Leave blank—no auth for this example (or your API username if required).
  • Password: Leave blank—no auth (or your API key/token if required).
  • Port: Leave blank—defaults to 443 for HTTPS.
  • Click “Save” Airflow Configuration Options.

Step 2: Create a DAG with HttpSensor

  1. Open a Text Editor: Use Notepad, Visual Studio Code, or any editor that saves .py files—ensuring compatibility with Airflow’s Python environment.
  2. Write the DAG: Define a DAG that uses the HttpSensor to monitor an HTTP endpoint:
  • Paste the following code:
from airflow import DAG
from airflow.sensors.http import HttpSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="http_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    check_api = HttpSensor(
        task_id="check_api",
        http_conn_id="http_default",
        endpoint="/status/200",
        poke_interval=60,  # Check every 60 seconds
        timeout=3600,      # Fail after 1 hour
        response_check=lambda response: response.status_code == 200,
    )
    process = BashOperator(
        task_id="process",
        bash_command="echo 'API is ready!'",
    )
    check_api >> process
  • Save this as http_sensor_dag.py in ~/airflow/dags—e.g., /home/username/airflow/dags/http_sensor_dag.py on Linux/macOS or C:/Users/YourUsername/airflow/dags/http_sensor_dag.py on Windows. This DAG monitors https://httpbin.org/status/200 until a 200 status is received.

Step 3: Test and Execute the DAG

  1. Test with CLI: Activate your environment, type airflow dags test http_sensor_dag 2025-04-07, and press Enter—this runs a dry test for April 7, 2025. The HttpSensor polls https://httpbin.org/status/200 every 60 seconds, succeeds immediately (200 OK), logs the response, and echoes “API is ready!”—verify in logs (DAG Testing with Python).
  2. Run Live: Type airflow dags trigger -e 2025-04-07 http_sensor_dag, press Enter—initiates live execution. Open your browser to localhost:8080, where “check_api” turns green upon successful condition met, followed by “process”—check logs for confirmation (Airflow Web UI Overview).

This setup demonstrates how the HttpSensor monitors a reliable HTTP endpoint, setting the stage for more complex API monitoring scenarios.


Key Features of the HttpSensor

The HttpSensor offers several features that enhance its utility in Airflow workflows, each providing specific control over HTTP endpoint monitoring.

Flexible Endpoint Monitoring

The endpoint parameter defines the URL path to monitor—e.g., endpoint="/status"—appended to the base URL from http_conn_id. It supports Jinja templating—e.g., endpoint="/status/{ { ds } }"—allowing dynamic paths based on runtime variables like the execution date, making it adaptable to workflows monitoring variable endpoints or time-specific resources.

Example: Dynamic Endpoint Monitoring

from airflow import DAG
from airflow.sensors.http import HttpSensor
from datetime import datetime

with DAG(
    dag_id="dynamic_http_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    check_dynamic = HttpSensor(
        task_id="check_dynamic",
        http_conn_id="http_default",
        endpoint="/delay/{ { ds } }",
        poke_interval=60,
    )

This example monitors a dynamic endpoint—e.g., /delay/2025-04-07.

Custom Response Validation

The response_check parameter—e.g., response_check=lambda response: response.status_code == 200—defines a Python callable to validate the HTTP response. It receives the response object and returns True to succeed or False to keep polling—e.g., checking status codes, JSON content (response.json()["key"] == "value"), or headers—offering precise control over success conditions beyond default status checks.

Example: JSON Response Check

from airflow import DAG
from airflow.sensors.http import HttpSensor
from datetime import datetime

def check_json(response):
    return response.json().get("status") == "ok"

with DAG(
    dag_id="json_http_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    check_json_response = HttpSensor(
        task_id="check_json_response",
        http_conn_id="http_default",
        endpoint="/json",
        response_check=check_json,
        poke_interval=60,
    )

This example waits until the JSON response contains "status": "ok".

Configurable Polling Interval

The poke_interval parameter sets the frequency of HTTP checks in seconds—e.g., poke_interval=30 for every 30 seconds (default: 30). This allows you to balance responsiveness—shorter intervals for quick detection—and resource efficiency—longer intervals to reduce load—tailoring the sensor to your endpoint’s expected response timing.

Example: Fast Polling Interval

from airflow import DAG
from airflow.sensors.http import HttpSensor
from datetime import datetime

with DAG(
    dag_id="fast_poke_http_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    check_fast = HttpSensor(
        task_id="check_fast",
        http_conn_id="http_default",
        endpoint="/status/200",
        poke_interval=10,  # Check every 10 seconds
    )

This example checks the endpoint every 10 seconds.

Timeout and Mode Control

The timeout and mode parameters manage wait duration and resource usage—e.g., timeout=7200 (2 hours) sets the maximum wait, and mode="reschedule" (default: poke) frees the worker between checks. timeout prevents indefinite waits—e.g., if an API never responds—while mode="reschedule" optimizes long waits (e.g., hours) by rescheduling, contrasting with poke for short waits, ensuring efficient monitoring tailored to your needs.

Example: Reschedule Mode with Timeout

from airflow import DAG
from airflow.sensors.http import HttpSensor
from datetime import datetime

with DAG(
    dag_id="reschedule_http_sensor_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    check_reschedule = HttpSensor(
        task_id="check_reschedule",
        http_conn_id="http_default",
        endpoint="/status/200",
        poke_interval=300,  # 5 minutes
        timeout=7200,       # 2 hours
        mode="reschedule",
    )

This example reschedules checks every 5 minutes, failing after 2 hours.


Best Practices for Using the HttpSensor


Frequently Asked Questions About the HttpSensor

Here are common questions about the HttpSensor, with detailed, concise answers from online discussions.

1. Why does my HttpSensor fail with a connection error?

The http_conn_id—e.g., http_default—might be misconfigured. Check “Connections” UI—verify host—and ensure the endpoint is accessible—test with airflow dags test (Task Logging and Monitoring).

2. How do I validate a specific response?

Set response_check—e.g., response_check=lambda r: r.json()["status"] == "ok"—to define custom conditions (DAG Parameters and Defaults).

3. Can I monitor multiple endpoints in one task?

No, one endpoint per sensor—e.g., endpoint="/status". Use multiple HttpSensor tasks—combine with TriggerRule (Airflow Trigger Rules).

4. Why does my HttpSensor timeout unexpectedly?

The timeout—e.g., timeout=300—might be too short. Increase it—e.g., timeout=3600—and test with airflow dags test (Task Timeouts and SLAs).

5. How can I debug a failed HttpSensor task?

Run airflow tasks test my_dag task_id 2025-04-07—logs attempts—e.g., “Response: 404” (DAG Testing with Python). Check ~/airflow/logs—details like “Timeout” (Task Logging and Monitoring).

6. Is it possible to use the HttpSensor in dynamic DAGs?

Yes, use it in a loop—e.g., HttpSensor(task_id=f"http_{i}", endpoint=f"/data/{i}", ...)—each monitoring a unique endpoint (Dynamic DAG Generation).

7. How do I retry a failed HttpSensor task?

Set retries and retry_delay—e.g., retries=3, retry_delay=timedelta(minutes=5)—retries 3 times, waiting 5 minutes if it fails—e.g., network issue (Task Retries and Retry Delays).


Conclusion

The HttpSensor enhances your Apache Airflow workflows with seamless HTTP endpoint monitoring—build your DAGs with Defining DAGs in Python, install Airflow via Installing Airflow (Local, Docker, Cloud), and optimize performance with Airflow Performance Tuning. Monitor task execution in Monitoring Task Status in UI) and deepen your understanding with Airflow Concepts: DAGs, Tasks, and Workflows!