Airflow with Kubernetes Executor

Apache Airflow is a powerful platform for orchestrating complex workflows, and its integration with the Kubernetes Executor leverages the scalability and flexibility of Kubernetes to execute tasks efficiently in a cloud-native environment. Whether you’re running tasks with PythonOperator, sending notifications via EmailOperator, or connecting to systems like Airflow with Apache Spark, the Kubernetes Executor enhances Airflow’s ability to handle dynamic workloads. This comprehensive guide, hosted on SparkCodeHub, explores Airflow with the Kubernetes Executor—how it works, how to set it up, and best practices for effective use. We’ll provide detailed step-by-step instructions, practical examples, and a thorough FAQ section. For foundational knowledge, start with Airflow Web UI Overview and pair this with Defining DAGs in Python.


What is Airflow with Kubernetes Executor?

Airflow with the Kubernetes Executor refers to the use of Kubernetes as an execution engine for Airflow tasks, replacing traditional executors like LocalExecutor or CeleryExecutor (Airflow Executors (Sequential, Local, Celery)). In this setup, the Airflow Scheduler—part of Airflow’s core architecture (Airflow Architecture (Scheduler, Webserver, Executor))—parses DAGs from the ~/airflow/dags directory (DAG File Structure Best Practices) and delegates each task to a separate Kubernetes Pod. These Pods are dynamically created, run the task, and are terminated upon completion, leveraging Kubernetes’ container orchestration capabilities. Configured via airflow.cfg and Kubernetes-specific settings (e.g., kube_config), the Executor uses the Kubernetes API to manage Pods, with task states updated in the metadata database (airflow.db). The Web UI displays execution progress (Monitoring Task Status in UI), while logs are fetched from Pods or persistent storage (Task Logging and Monitoring). This integration combines Airflow’s workflow orchestration with Kubernetes’ scalability, isolation, and resource management, making it ideal for modern, distributed systems.

Core Components

  • Kubernetes Executor: Runs tasks as Kubernetes Pods, replacing traditional executors.
  • Pods: Dynamically spawned containers executing individual tasks.
  • Scheduler: Orchestrates task scheduling and Pod creation via Kubernetes API.
  • Configuration: Customizes Pod behavior (e.g., image, resources) via airflow.cfg or DAG parameters.

Why Airflow with Kubernetes Executor Matters

The Kubernetes Executor matters because it brings the scalability, resilience, and resource efficiency of Kubernetes to Airflow, addressing the limitations of traditional executors in dynamic, large-scale environments. Unlike LocalExecutor, which runs tasks on a single machine, or CeleryExecutor, which requires a fixed worker pool, the Kubernetes Executor dynamically allocates resources per task, isolating them in Pods to prevent interference—crucial for workflows with diverse dependencies (Airflow XComs: Task Communication). It supports scheduling flexibility (Schedule Interval Configuration), backfills (Catchup and Backfill Scheduling), and retries (Task Retries and Retry Delays), scaling seamlessly with Kubernetes’ cluster capacity. For example, a data pipeline with hundreds of tasks—some requiring heavy compute, others lightweight—can run concurrently without overloading a single host, with Kubernetes auto-scaling resources as needed (Airflow Performance Tuning). This integration reduces operational overhead, enhances fault tolerance, and optimizes resource use, making it a game-changer for cloud-native and distributed workflows.

Practical Benefits

  • Scalability: Dynamically scales tasks with Kubernetes’ cluster resources.
  • Isolation: Runs each task in a separate Pod, preventing conflicts.
  • Resource Efficiency: Allocates only what’s needed per task, minimizing waste.
  • Cloud-Native: Leverages Kubernetes’ orchestration for modern deployments.

How Airflow with Kubernetes Executor Works

The Kubernetes Executor works by delegating task execution to Kubernetes Pods, orchestrated by Airflow’s Scheduler. When a DAG is triggered—manually (Triggering DAGs via UI) or via schedule_interval—the Scheduler parses it from the dags folder, identifies tasks, and sends requests to the Kubernetes API to create Pods. Each Pod runs a container with the Airflow worker image (e.g., apache/airflow:2.9.0), executing a single task using the specified command (e.g., airflow tasks run). The Pod’s configuration—image, CPU/memory limits, environment variables—is defined in airflow.cfg under [kubernetes] or overridden in the DAG via executor_config. The Executor monitors Pod status via the Kubernetes API, updating the metadata database with states (e.g., “running,” “success”) as tasks complete (DAG Serialization in Airflow). Pods terminate post-execution, with logs fetched from Kubernetes or persistent volumes. The Webserver renders this in Graph View (Airflow Graph View Explained), integrating Kubernetes’ dynamic execution into Airflow’s workflow management, ensuring scalability and isolation.

Using Airflow with Kubernetes Executor

Let’s set up Airflow with the Kubernetes Executor and run a DAG, with detailed steps.

Step 1: Set Up Your Airflow and Kubernetes Environment

  1. Install Docker and Minikube: Install Docker and Minikube (a local Kubernetes cluster) on your machine—e.g., on macOS: brew install docker minikube. Start Minikube: minikube start.
  2. Install Airflow with Kubernetes Support: Open your terminal, navigate to your home directory (cd ~), and create a virtual environment (python -m venv airflow_env). Activate it—source airflow_env/bin/activate on Mac/Linux or airflow_env\Scripts\activate on Windows—then install Airflow (pip install "apache-airflow[kubernetes]").
  3. Initialize the Database: Run airflow db init to create the metadata database at ~/airflow/airflow.db.
  4. Configure Kubernetes Executor: Edit ~/airflow/airflow.cfg: ```ini [core] executor = KubernetesExecutor

[kubernetes] pod_template_file = ~/airflow/pod_template.yaml namespace = default delete_worker_pods = True worker_container_repository = apache/airflow worker_container_tag = 2.9.0 Create <mark>~/airflow/pod_template.yaml</mark>:yaml apiVersion: v1 kind: Pod metadata: name: airflow-worker spec: containers: - name: base image: apache/airflow:2.9.0 imagePullPolicy: IfNotPresent resources: requests: cpu: "100m" memory: "256Mi" limits: cpu: "200m" memory: "512Mi" env: - name: AIRFLOW__CORE__EXECUTOR value: "KubernetesExecutor" restartPolicy: Never ``` 5. Set Up Kubernetes Config: Ensure kubectl uses Minikube’s context (kubectl config use-context minikube) and Airflow can access it (default ~/.kube/config). 6. Start Airflow Services: In one terminal, run airflow webserver -p 8080. In another, run airflow scheduler (Installing Airflow (Local, Docker, Cloud)).

Step 2: Create a Sample DAG

  1. Open a Text Editor: Use Visual Studio Code or any plain-text editor—ensure .py output.
  2. Write the DAG Script: Define a DAG with Kubernetes-executed tasks:
  • Copy this code:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import time

def extract():
    time.sleep(2)  # Simulate work
    print("Extracting data")

def transform():
    time.sleep(3)
    print("Transforming data")

def load():
    raise ValueError("Load failed intentionally")

with DAG(
    dag_id="k8s_executor_demo",
    start_date=datetime(2025, 1, 1),
    schedule_interval=None,  # Manual triggers
    catchup=False,
) as dag:
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
        executor_config={"KubernetesExecutor": {"image": "apache/airflow:2.9.0", "requests_cpu": "100m", "limits_cpu": "200m"} },
    )
    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
        executor_config={"KubernetesExecutor": {"image": "apache/airflow:2.9.0", "requests_cpu": "150m", "limits_cpu": "300m"} },
    )
    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
        executor_config={"KubernetesExecutor": {"image": "apache/airflow:2.9.0", "requests_cpu": "100m", "limits_cpu": "200m"} },
    )
    extract_task >> transform_task >> load_task
  • Save as k8s_executor_demo.py in ~/airflow/dags.

Step 3: Execute and Monitor the DAG with Kubernetes Executor

  1. Mount DAGs Folder: Mount the dags folder into Minikube: minikube mount ~/airflow/dags:/airflow/dags.
  2. Trigger the DAG: At localhost:8080, toggle “k8s_executor_demo” to “On,” click “Trigger DAG” for April 7, 2025. In Graph View, monitor:
  • extract: Runs in a Pod, turns green (success).
  • transform: Runs in a separate Pod, turns green.
  • load: Fails (red) due to ValueError.

3. Check Pods: Run kubectl get pods—see Pods like k8s-executor-demo-extract-xxx created and terminated post-run. 4. View Logs: In Graph View, click load > “Log”—see “ValueError: Load failed intentionally” from the Pod (Triggering DAGs via UI). 5. Retry Task: Click load > “Clear,” confirm—it spawns a new Pod, updating status if fixed.

This setup demonstrates Kubernetes Executor running tasks in isolated Pods, monitored via Airflow’s UI.

Key Features of Airflow with Kubernetes Executor

Airflow’s Kubernetes Executor offers a robust set of features, detailed below.

Dynamic Pod Creation and Termination

The Kubernetes Executor dynamically creates a Pod for each task, running it with a specified container image (e.g., apache/airflow:2.9.0) and terminating it upon completion (delete_worker_pods=True). This ensures tasks run in isolation, with resources allocated only during execution, leveraging Kubernetes’ orchestration for scalability and cleanup.

Example: Task Isolation

In the DAG, extract, transform, and load each run in separate Pods—kubectl get pods shows them spawn and disappear, isolating dependencies (Airflow Executors (Sequential, Local, Celery)).

Customizable Pod Configuration

Tasks can override Pod settings via executor_config—e.g., custom images, CPU/memory requests/limits, environment variables, or volumes. Defined in the DAG or pod_template_file, this flexibility tailors resources to task needs—e.g., a compute-heavy transform gets more CPU—optimizing performance and cost.

Example: Resource Tuning

transform_task uses requests_cpu="150m", limits_cpu="300m"—ensuring sufficient resources for its 3-second sleep, visible in kubectl describe pod.

Scalable Task Execution

The Executor scales with the Kubernetes cluster—running hundreds of tasks concurrently across nodes, limited only by cluster capacity. Kubernetes’ auto-scaling adjusts resources dynamically, supporting large-scale workflows without fixed worker pools, enhancing throughput for dynamic DAGs (Dynamic DAG Generation).

Example: Concurrent Runs

Trigger k8s_executor_demo multiple times—Pods run in parallel, scaling with Minikube’s nodes, visible in kubectl get pods -w.

Real-Time Monitoring in UI

Graph View tracks Pod statuses—green for success, red for failure—updated from the database, with logs fetched from Kubernetes via the Executor. This integrates Kubernetes execution into Airflow’s monitoring framework, providing immediate visibility into task progress and failures (Airflow Metrics and Monitoring Tools).

Example: Failure Visibility

In Graph View, load turns red—logs show “ValueError” from its Pod, guiding quick debugging (Airflow Graph View Explained).

Seamless Kubernetes Integration

The Executor uses the Kubernetes API for Pod management, authenticated via kube_config or in-cluster credentials, ensuring compatibility with Kubernetes clusters (e.g., EKS, GKE). It supports namespaces, RBAC, and persistent volumes, aligning Airflow with cloud-native practices for robust deployment (Airflow Performance Tuning).

Example: Namespace Control

Pods run in default namespace—configurable to airflow for isolation, verified with kubectl get pods -n default.

Best Practices for Airflow with Kubernetes Executor

Optimize this integration with these detailed guidelines:

  • Use a Robust Cluster: Deploy Airflow on a production Kubernetes cluster (e.g., EKS) instead of Minikube—ensures scalability and reliability Installing Airflow (Local, Docker, Cloud).
  • Test Pod Configs: Validate executor_config—e.g., CPU/memory limits—with kubectl apply -f test-pod.yaml before DAG runs DAG Testing with Python.
  • Optimize Resource Requests: Set minimal requests and reasonable limits—e.g., 100m/200m CPU—to balance efficiency and performance Airflow Performance Tuning.
  • Enable Pod Deletion: Keep delete_worker_pods=True—prevents resource leaks; monitor with kubectl get pods post-run.
  • Monitor Post-Trigger: Check Graph View and Pod logs—e.g., red load signals a failure—for quick resolution Airflow Graph View Explained.
  • Use Persistent Volumes: Mount volumes for logs (e.g., PVC in pod_template.yaml)—ensures log availability post-Pod deletion Task Logging and Monitoring.
  • Document Configs: Track pod_template.yaml and executor_config settings—e.g., in a README—for team clarity DAG File Structure Best Practices.
  • Handle Time Zones: Align execution_date with your time zone—e.g., adjust for UTC in logs Time Zones in Airflow Scheduling.

These practices ensure a scalable, efficient Kubernetes Executor setup.

FAQ: Common Questions About Airflow with Kubernetes Executor

Here’s an expanded set of answers to frequent questions from Airflow users.

1. Why don’t Pods start for my tasks?

Kubernetes cluster may be down—check kubectl cluster-info—or kube_config is invalid. Verify with kubectl get nodes (Airflow Configuration Basics).

2. How do I debug Pod failures?

Check load logs in Graph View—e.g., “ValueError”—then kubectl logs pod-name for container errors (Task Logging and Monitoring).

3. Why are Pods not terminating?

delete_worker_pods=False—set to True in airflow.cfg and restart Scheduler (Airflow Performance Tuning).

4. How do I scale task execution?

Increase cluster nodes—e.g., minikube node add—or use auto-scaling in production clusters (Airflow Executors (Sequential, Local, Celery)).

5. Can I use custom images per task?

Yes—set image in executor_config—e.g., my-python-image:1.0 for transform—tailored to task needs (Airflow XComs: Task Communication).

6. Why are logs missing after Pod deletion?

No persistent volume—add a PVC in pod_template.yaml and configure log_storage_class (DAG Views and Task Logs).

7. How do I monitor Pod resource usage?

Use kubectl top pods or integrate Prometheus/Grafana—e.g., pod_cpu_usage_seconds_total (Airflow Metrics and Monitoring Tools).

8. Can Kubernetes Executor handle backfills?

Yes—set catchup=True and trigger; Pods scale with backfill tasks (Catchup and Backfill Scheduling).


Conclusion

Airflow with Kubernetes Executor powers scalable, isolated workflows—set it up with Installing Airflow (Local, Docker, Cloud), craft DAGs via Defining DAGs in Python, and monitor with Airflow Graph View Explained. Explore more with Airflow Concepts: DAGs, Tasks, and Workflows and Customizing Airflow Web UI!