Mastering Airflow with Databricks: A Comprehensive Guide
Apache Airflow is a versatile platform for orchestrating workflows, and its integration with Databricks supercharges its capabilities by combining Airflow’s scheduling prowess with Databricks’ optimized Spark engine for big data processing and analytics. Whether you’re running tasks with PythonOperator, sending notifications via EmailOperator, or connecting to systems like Airflow with Apache Spark, Databricks provides a unified data platform within Airflow pipelines. This comprehensive guide, hosted on SparkCodeHub, explores Airflow with Databricks—how it works, how to set it up, and best practices for optimal use. We’ll provide detailed step-by-step instructions, practical examples with code, and an extensive FAQ section. For foundational knowledge, start with Airflow Web UI Overview and pair this with Defining DAGs in Python.
What is Airflow with Databricks?
Airflow with Databricks refers to the integration of Apache Airflow’s workflow orchestration capabilities with Databricks’ unified data analytics platform, built on Apache Spark. Managed by Airflow’s Scheduler and Executor components (Airflow Architecture (Scheduler, Webserver, Executor)), this integration allows Airflow to trigger, manage, and monitor Databricks jobs—such as Spark workloads, notebooks, or SQL queries—as part of Directed Acyclic Graphs (DAGs) defined in the ~/airflow/dags directory (DAG File Structure Best Practices). Using the apache-airflow-providers-databricks package, operators like DatabricksSubmitRunOperator, DatabricksRunNowOperator, and DatabricksSqlOperator, along with the DatabricksHook, enable Airflow to interact with Databricks workspaces. Task states are tracked in the metadata database (airflow.db), with execution monitored via the Web UI (Monitoring Task Status in UI) and logs retrieved from Databricks (Task Logging and Monitoring). This integration blends Airflow’s orchestration strengths with Databricks’ scalable compute power, making it ideal for data engineering, machine learning, and analytics workflows.
Core Components in Detail
Airflow’s integration with Databricks relies on several core components, each with specific roles and configurable parameters. Below, we explore these components in depth, including their functionality, parameters, and practical code examples.
1. DatabricksSubmitRunOperator: Submits a New Databricks Job Run
The DatabricksSubmitRunOperator submits a new job run to Databricks using the /api/2.1/jobs/runs/submit API endpoint, allowing Airflow to define and execute a Spark job, notebook, or JAR without requiring a pre-existing Databricks job.
- Key Functionality: Submits a one-time Databricks run with a custom job specification, polling for completion and returning the run status.
- Parameters:
- task_id (str): Unique identifier for the task (e.g., "submit_databricks_run").
- databricks_conn_id (str): Airflow Connection ID for Databricks credentials (default: "databricks_default").
- json (dict): Job specification (e.g., cluster config, notebook path)—overrides other parameters if provided.
- new_cluster (dict): New cluster configuration (e.g., {"spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 2}).
- existing_cluster_id (str): ID of an existing cluster (e.g., "1234-5678-abcde").
- notebook_task (dict): Notebook task details (e.g., {"notebook_path": "/Users/user/notebook"}).
- spark_jar_task (dict): Spark JAR task details (e.g., {"main_class_name": "com.example.Main"}).
- python_script_task (dict): Python script task details (e.g., {"python_file": "/path/to/script.py"}).
- polling_period_seconds (int): Polling interval for job status (default: 30).
- timeout_seconds (int): Total timeout for job completion (default: None, no timeout).
- do_xcom_push (bool): Push run ID to XCom (default: True).
- Code Example:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime
with DAG(
dag_id="databricks_submit_example",
start_date=datetime(2025, 4, 1),
schedule_interval=None,
catchup=False,
) as dag:
submit_task = DatabricksSubmitRunOperator(
task_id="submit_databricks_run",
databricks_conn_id="databricks_default",
new_cluster={
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
notebook_task={
"notebook_path": "/Users/user/example_notebook",
},
polling_period_seconds=30,
timeout_seconds=3600,
do_xcom_push=True,
)
This example submits a notebook task to a new Databricks cluster, polling every 30 seconds for up to 1 hour.
2. DatabricksRunNowOperator: Triggers an Existing Databricks Job
The DatabricksRunNowOperator triggers an existing Databricks job using the /api/2.1/jobs/run-now API endpoint, ideal for pre-defined jobs in the Databricks Jobs UI.
- Key Functionality: Runs a pre-existing Databricks job by job ID, polling for completion and returning the run status.
- Parameters:
- task_id (str): Unique identifier for the task (e.g., "run_databricks_job").
- databricks_conn_id (str): Airflow Connection ID (default: "databricks_default").
- job_id (str): Databricks job ID (e.g., "123").
- notebook_params (dict): Parameters for notebook tasks (e.g., {"param1": "value1"}).
- jar_params (list): Parameters for JAR tasks (e.g., ["arg1", "arg2"]).
- python_params (list): Parameters for Python tasks (e.g., ["param1"]).
- polling_period_seconds (int): Polling interval (default: 30).
- timeout_seconds (int): Total timeout (default: None).
- do_xcom_push (bool): Push run ID to XCom (default: True).
- Code Example:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from datetime import datetime
with DAG(
dag_id="databricks_run_now_example",
start_date=datetime(2025, 4, 1),
schedule_interval=None,
catchup=False,
) as dag:
run_now_task = DatabricksRunNowOperator(
task_id="run_databricks_job",
databricks_conn_id="databricks_default",
job_id="123", # Replace with your Databricks job ID
notebook_params={"input_file": "s3://my-bucket/data.csv"},
polling_period_seconds=30,
timeout_seconds=3600,
do_xcom_push=True,
)
This triggers an existing job (ID 123) with a notebook parameter, polling every 30 seconds.
3. DatabricksSqlOperator: Executes SQL Queries on Databricks
The DatabricksSqlOperator executes SQL queries on Databricks SQL endpoints, integrating analytics into Airflow workflows.
- Key Functionality: Runs SQL queries via Databricks SQL endpoints, returning results for downstream tasks.
- Parameters:
- task_id (str): Unique identifier (e.g., "run_sql_query").
- databricks_conn_id (str): Connection ID (default: "databricks_default").
- sql (str or list[str]): SQL query or list of queries (e.g., "SELECT * FROM my_table").
- sql_endpoint_name (str): Databricks SQL endpoint name (e.g., "my_endpoint").
- parameters (dict): Query parameters (e.g., {"param": "value"}).
- do_xcom_push (bool): Push query results to XCom (default: False).
- Code Example:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSqlOperator
from datetime import datetime
with DAG(
dag_id="databricks_sql_example",
start_date=datetime(2025, 4, 1),
schedule_interval=None,
catchup=False,
) as dag:
sql_task = DatabricksSqlOperator(
task_id="run_sql_query",
databricks_conn_id="databricks_default",
sql="SELECT COUNT(*) as eng_count FROM employees WHERE department = 'Engineering'",
sql_endpoint_name="my_endpoint",
do_xcom_push=True,
)
This executes a SQL query on a Databricks SQL endpoint, pushing the result to XCom.
4. DatabricksHook: Programmatic Databricks Access
The DatabricksHook provides programmatic access to Databricks APIs, enabling custom task logic beyond operators.
- Key Functionality: Submits runs or queries via Databricks APIs, offering flexibility for advanced workflows.
- Parameters:
- databricks_conn_id (str): Connection ID (default: "databricks_default").
- Methods: run_now(), submit_run(), run_sql()—execute jobs or queries.
- Code Example:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from datetime import datetime
def custom_databricks_run():
hook = DatabricksHook(databricks_conn_id="databricks_default")
run_id = hook.submit_run(
new_cluster={"spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 2},
notebook_task={"notebook_path": "/Users/user/example_notebook"}
)
print(f"Submitted run ID: {run_id}")
with DAG(
dag_id="databricks_hook_example",
start_date=datetime(2025, 4, 1),
schedule_interval=None,
catchup=False,
) as dag:
custom_task = PythonOperator(
task_id="custom_databricks_task",
python_callable=custom_databricks_run,
)
This submits a custom Databricks run via DatabricksHook.
5. Connections: Airflow Connection IDs (e.g., databricks_default)
Airflow Connections configure Databricks access, centralizing credentials and settings.
- Key Functionality: Stores Databricks workspace details—e.g., host, token—for secure, reusable access.
- Parameters:
- conn_id (str): Unique identifier (e.g., databricks_default).
- conn_type (str): databricks—specifies Databricks connection.
- host (str): Databricks workspace URL (e.g., https://<region>.azuredatabricks.net</region>).
- login (str): Optional username (if using basic auth).
- password (str): Personal Access Token (PAT) or password.
- extra (dict): JSON config (e.g., {"token": "dapi123..."} for PAT).
- Code Example (UI Setup):
- In Airflow UI: Admin > Connections > +
- Conn Id: databricks_default
- Conn Type: Databricks
- Host: https://<region>.azuredatabricks.net</region>
- Password: <your-databricks-pat></your-databricks-pat>
- Save
This connection authenticates Airflow to Databricks securely.
Key Parameters for Airflow with Databricks
Additional parameters in airflow.cfg and operator configurations fine-tune the integration:
- databricks_conn_id: Connection ID for Databricks (default: "databricks_default")—used across operators and hooks.
- job_id: Databricks job ID for DatabricksRunNowOperator (e.g., "123").
- new_cluster: Cluster config for DatabricksSubmitRunOperator (e.g., {"spark_version": "13.3.x-scala2.12", "num_workers": 2}).
- notebook_task: Notebook details (e.g., {"notebook_path": "/path"}).
- spark_jar_task: JAR details (e.g., {"main_class_name": "com.example.Main"}).
- python_script_task: Python script details (e.g., {"python_file": "/path/to/script.py"}).
- sql: SQL query for DatabricksSqlOperator (e.g., "SELECT * FROM table").
- polling_period_seconds: Status polling interval (e.g., 30).
- timeout_seconds: Job timeout (e.g., 3600).
These parameters ensure precise control over Databricks job execution within Airflow.
Setting Up Airflow with Databricks: Step-by-Step Guide
Let’s configure Airflow with Databricks in local mode and run a sample DAG with a notebook task.
Step 1: Set Up Your Airflow and Databricks Environment
- Install Docker: Install Docker Desktop—e.g., on macOS: brew install docker. Start Docker and verify: docker --version.
- Install Airflow with Databricks Support: Open your terminal, navigate to your home directory (cd ~), and create a virtual environment (python -m venv airflow_env). Activate it—source airflow_env/bin/activate on Mac/Linux or airflow_env\Scripts\activate on Windows—then install Airflow with Databricks support (pip install "apache-airflow[databricks]").
- Set Up Databricks Workspace: Create a Databricks workspace (e.g., on AWS, Azure, or GCP). Generate a Personal Access Token (PAT) under User Settings > Access Tokens—e.g., dapi1234567890abcdef.
- Create a Databricks Notebook: In your Databricks workspace, create a notebook (e.g., /Users/<your-email>/example_notebook</your-email>) with:
# example_notebook.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AirflowDemo").getOrCreate()
data = [("Alice", "Engineering"), ("Bob", "Sales")]
df = spark.createDataFrame(data, ["name", "department"])
df_filtered = df.filter(df.department == "Engineering")
df_filtered.write.mode("overwrite").csv("/tmp/spark_output")
spark.stop()
- Initialize the Database: Run airflow db init to create the metadata database at ~/airflow/airflow.db.
- Configure Databricks Connection: In Airflow UI (localhost:8080, post-service start), go to Admin > Connections, click “+”, set:
- Conn Id: databricks_default
- Conn Type: Databricks
- Host: https://<region>.azuredatabricks.net</region> (replace with your workspace URL)
- Password: dapi1234567890abcdef (your PAT)
- Save
7. Start Airflow Services: In one terminal, run airflow webserver -p 8080. In another, run airflow scheduler.
Step 2: Create a Sample DAG
- Open a Text Editor: Use Visual Studio Code or any plain-text editor—ensure .py output.
- Write the DAG Script: Define a DAG to submit a Databricks notebook task:
- Copy this code:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime
with DAG(
dag_id="databricks_integration_demo",
start_date=datetime(2025, 4, 1),
schedule_interval=None,
catchup=False,
) as dag:
databricks_task = DatabricksSubmitRunOperator(
task_id="run_databricks_notebook",
databricks_conn_id="databricks_default",
new_cluster={
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
notebook_task={
"notebook_path": "/Users/<your-email>/example_notebook", # Replace with your notebook path
},
polling_period_seconds=30,
timeout_seconds=3600,
do_xcom_push=True,
)
- Save as databricks_integration_demo.py in ~/airflow/dags. Replace <your-email></your-email> with your Databricks email.
Step 3: Execute and Monitor the DAG with Databricks
- Verify Databricks Setup: Ensure your workspace, PAT, and notebook are accessible—test manually in Databricks if needed.
- Trigger the DAG: At localhost:8080, toggle “databricks_integration_demo” to “On,” click “Trigger DAG” for April 7, 2025. In Graph View, monitor:
- run_databricks_notebook: Submits the notebook task, turns green on success.
3. Check Databricks Output: In your Databricks workspace, verify the notebook ran and /tmp/spark_output contains CSV files with Alice, Engineering. 4. View Logs: In Graph View, click run_databricks_notebook > “Log”—see Databricks job output, including run URL and completion status. 5. Retry Task: If the task fails (e.g., due to a misconfigured path), fix it, click “Clear,” and retry—updates status on success.
This setup demonstrates Airflow submitting and managing a Databricks notebook task, monitored via the UI.
Key Features of Airflow with Databricks
Airflow’s integration with Databricks offers powerful features, detailed below.
Seamless Job Submission
The DatabricksSubmitRunOperator submits custom Spark jobs to Databricks, using new_cluster (e.g., {"num_workers": 2}) and task definitions (e.g., notebook_task). Configurable with polling_period_seconds (e.g., 30) and timeout_seconds (e.g., 3600), it ensures reliable job execution and monitoring within Airflow workflows.
Example: Custom Job
run_databricks_notebook submits a notebook task—processes data and writes to /tmp/spark_output, tracked in Graph View.
Pre-Defined Job Triggering
The DatabricksRunNowOperator triggers existing Databricks jobs by job_id (e.g., "123"), passing parameters like notebook_params (e.g., {"input_file": "data.csv"}). It integrates pre-configured workflows, leveraging Databricks’ Jobs UI for management.
Example: Job Reuse
run_databricks_job triggers a pre-existing job—reuses Databricks configuration, monitored seamlessly.
SQL Query Execution
The DatabricksSqlOperator executes SQL queries on Databricks SQL endpoints, using sql (e.g., "SELECT COUNT(*) FROM table") and sql_endpoint_name (e.g., "my_endpoint"). It supports analytics workflows, pushing results to XCom for downstream tasks.
Example: Analytics Integration
run_sql_query counts engineers—results available in XCom, enhancing data-driven workflows.
Real-Time Monitoring in UI
Graph View tracks Databricks task statuses—green for success, red for failure—updated from the metadata database, with logs from Databricks accessible via the UI. This provides immediate visibility into job execution and outcomes (Airflow Metrics and Monitoring Tools).
Example: Execution Oversight
run_databricks_notebook turns green—logs show job completion, tracked in Graph View (Airflow Graph View Explained).
Flexible Configuration and Authentication
Connections (e.g., databricks_default) with host (e.g., https://<region>.azuredatabricks.net</region>) and password (PAT) configure Databricks access securely. Parameters like new_cluster and conf fine-tune job execution, adapting to diverse use cases.
Example: Secure Setup
databricks_default uses a PAT—ensures secure, reusable authentication for all tasks.
Best Practices for Airflow with Databricks
Optimize this integration with these detailed guidelines:
- Use Managed Databricks Workspaces: Deploy Databricks on AWS, Azure, or GCP—ensures scalability and reliability over local setups Installing Airflow (Local, Docker, Cloud).
- Test Jobs Locally: Validate Databricks notebooks or scripts in the workspace—e.g., run manually—before DAG execution DAG Testing with Python.
- Tune Job Resources: Set num_workers (e.g., 2), node_type_id (e.g., i3.xlarge), and spark_conf—monitor with Databricks UI or logs Airflow Performance Tuning.
- Secure Credentials: Store PATs in Airflow Connections—e.g., databricks_default—avoiding exposure in code or logs.
- Monitor Post-Trigger: Check Graph View and Databricks logs—e.g., red run_databricks_notebook signals a failure—for quick resolution Airflow Graph View Explained.
- Persist Logs: Enable Databricks logging—retrieve via Airflow logs or Databricks UI Task Logging and Monitoring.
- Document Configurations: Track databricks_conn_id, job_id, and cluster specs—e.g., in a README—for team clarity DAG File Structure Best Practices.
- Handle Time Zones: Align execution_date with your time zone—e.g., adjust for PDT in Databricks logs Time Zones in Airflow Scheduling.
These practices ensure a robust, efficient Databricks integration.
FAQ: Common Questions About Airflow with Databricks
Here’s an expanded set of answers to frequent questions from Airflow users.
1. Why does DatabricksSubmitRunOperator fail to submit jobs?
databricks_conn_id may be misconfigured—e.g., invalid PAT—test with Databricks UI manual run (Airflow Configuration Basics).
2. How do I debug Databricks job failures?
Check run_databricks_notebook logs in Graph View—e.g., “File not found”—then Databricks UI run details (Task Logging and Monitoring).
3. Why are Databricks jobs slow?
Insufficient resources—adjust num_workers (e.g., 4) or node_type_id—monitor with Databricks UI (Airflow Performance Tuning).
4. How do I retrieve Databricks job results?
Use do_xcom_push=True—e.g., run_sql_query pushes query results to XCom (Airflow XComs: Task Communication).
5. Can I run multiple Databricks jobs in one DAG?
Yes—chain operators—e.g., submit_task >> run_now_task >> sql_task—for sequential or parallel execution (Airflow Executors (Sequential, Local, Celery)).
6. Why are Databricks logs missing?
Log retrieval may fail—ensure PAT has permissions and check Databricks UI (DAG Views and Task Logs).
7. How do I monitor Databricks performance?
Use Databricks UI or integrate Prometheus—e.g., databricks_job_duration—with Airflow metrics (Airflow Metrics and Monitoring Tools).
8. Can Databricks trigger an Airflow DAG?
Yes—use Databricks’ REST API call (e.g., POST /dags/{dag_id}/dagRuns) with a webhook—requires Airflow API token (Triggering DAGs via UI).
Conclusion
Mastering Airflow with Databricks enables scalable, data-driven workflows—set it up with Installing Airflow (Local, Docker, Cloud), craft DAGs via Defining DAGs in Python, and monitor with Airflow Graph View Explained. Explore more with Airflow Concepts: DAGs, Tasks, and Workflows and Customizing Airflow Web UI!