DatabricksSubmitRunOperator in Apache Airflow: A Comprehensive Guide
Apache Airflow is a widely celebrated open-source platform renowned for orchestrating complex workflows, and within its extensive suite of tools, the DatabricksSubmitRunOperator stands as a powerful component for executing jobs on Databricks, a unified analytics platform built on Apache Spark. Located in the airflow.providers.databricks.operators.databricks module, this operator is meticulously designed to submit and manage Databricks job runs as part of Directed Acyclic Graphs (DAGs)—Python scripts that define the sequence and dependencies of tasks in your workflow. Whether you’re processing large datasets in ETL Pipelines with Airflow, automating machine learning workflows in CI/CD Pipelines with Airflow, or managing data transformations in Cloud-Native Workflows with Airflow, the DatabricksSubmitRunOperator provides a robust solution for leveraging Databricks’ scalable compute capabilities within Airflow. Hosted on SparkCodeHub, this guide offers an exhaustive exploration of the DatabricksSubmitRunOperator in Apache Airflow—covering its purpose, operational mechanics, configuration process, key features, and best practices for effective utilization. We’ll dive deep into every parameter with detailed explanations, guide you through processes with comprehensive step-by-step instructions, and illustrate concepts with practical examples enriched with additional context. For those new to Airflow, I recommend starting with Airflow Fundamentals and Defining DAGs in Python to establish a solid foundation, and you can explore its specifics further at DatabricksSubmitRunOperator or the official DatabricksSubmitRunOperator Docs.
Understanding DatabricksSubmitRunOperator in Apache Airflow
The DatabricksSubmitRunOperator is an operator in Apache Airflow that enables the submission and management of Databricks job runs within your DAGs (Introduction to DAGs in Airflow). It connects to Databricks using a connection ID (e.g., databricks_default), submits a job run with a specified configuration—such as a notebook, Spark JAR, or Python script, along with cluster settings and parameters—and waits for completion, integrating Databricks’ distributed computing capabilities into your workflow. This operator leverages the DatabricksHook to interact with the Databricks REST API, providing a seamless way to execute Spark-based workloads or data processing tasks on a managed cloud platform. It’s particularly valuable for workflows requiring scalable, distributed processing—such as transforming large datasets, training machine learning models, or running complex analytics—offering the power of Databricks within Airflow’s orchestration framework. The Airflow Scheduler triggers the task based on the schedule_interval you define (DAG Scheduling (Cron, Timetables)), while the Executor—typically the LocalExecutor—manages its execution (Airflow Architecture (Scheduler, Webserver, Executor)). Throughout this process, Airflow tracks the task’s state (e.g., running, succeeded) (Task Instances and States), logs job submission and completion details (Task Logging and Monitoring), and updates the web interface to reflect its progress (Airflow Graph View Explained). For detailed technical specifics, refer to the DatabricksSubmitRunOperator Docs.
Key Parameters Explained in Depth
- task_id: This is a string that uniquely identifies the task within your DAG, such as "submit_databricks_run". It’s a required parameter because it allows Airflow to distinguish this task from others when tracking its status, displaying it in the UI, or setting up dependencies. It’s the label you’ll encounter throughout your workflow management, ensuring clarity and organization across your pipeline.
- json: This is a dictionary (e.g., {"new_cluster": {...}, "notebook_task": {...} }) defining the Databricks job run configuration, including cluster settings (e.g., node type, number of workers) and task details (e.g., notebook path, parameters). It’s required and templated, allowing dynamic content via Jinja (e.g., "parameters": {"date": "{ { ds } }"}), driving the core functionality of the operator as outlined in the DatabricksSubmitRunOperator Docs.
- databricks_conn_id: An optional string (default: "databricks_default") specifying the Airflow connection ID for Databricks credentials. Configured in the UI or CLI, it includes details like the Databricks host and token, enabling secure access to the Databricks workspace. If unset, it assumes a default connection exists.
- timeout_seconds: An optional integer (default: 600) specifying the maximum time (in seconds) to wait for the Databricks job to complete before timing out. It’s not templated and ensures the operator doesn’t wait indefinitely for long-running jobs.
- do_xcom_push: An optional boolean (default: False) determining whether to push the Databricks run ID to XCom upon completion. If True, the run ID is available for downstream tasks; if False, no XCom output is generated, offering control over task communication.
- polling_period_seconds: An optional integer (default: 30) defining how often (in seconds) the operator polls Databricks for the job’s status. It balances responsiveness and resource usage during the wait period.
- run_name: An optional string (e.g., "my-run-{ { ds } }") specifying a human-readable name for the Databricks run. It’s templated and enhances visibility in the Databricks UI, defaulting to a generated name if omitted.
Purpose of DatabricksSubmitRunOperator
The DatabricksSubmitRunOperator’s primary purpose is to submit and manage Databricks job runs within Airflow workflows, enabling scalable, distributed data processing and analytics using Databricks’ Spark-based platform. It launches a job with a detailed configuration, waits for completion (by default), and integrates Databricks’ capabilities into your DAG, making it a key tool for data-intensive tasks. This is essential for workflows requiring robust computation—such as transforming large datasets in ETL Pipelines with Airflow, running validation or ML jobs in CI/CD Pipelines with Airflow, or aggregating data in Cloud-Native Workflows with Airflow. The Scheduler ensures timely execution (DAG Scheduling (Cron, Timetables)), retries handle transient Databricks issues (Task Retries and Retry Delays), and dependencies integrate it into broader pipelines (Task Dependencies).
Why It’s Valuable
- Scalable Processing: Harnesses Databricks’ distributed compute power for large-scale tasks.
- Flexible Configuration: Supports dynamic job specs with templated JSON.
- Databricks Integration: Ties Airflow to Databricks, a leading analytics platform.
How DatabricksSubmitRunOperator Works in Airflow
The DatabricksSubmitRunOperator works by connecting to Databricks via the DatabricksHook, authenticating with databricks_conn_id, and submitting a job run with the specified json configuration. When the Scheduler triggers the task—either manually or based on the schedule_interval—the operator sends the run request to Databricks’ REST API, polls the job status every polling_period_seconds until it completes or times out after timeout_seconds, and finishes once Databricks confirms success or failure. The job runs on a Databricks cluster, executing the specified task (e.g., notebook, JAR, or script), and completes server-side without local processing beyond API calls. The Scheduler queues the task within the DAG’s execution plan (DAG Serialization in Airflow), and the Executor (e.g., LocalExecutor) manages its execution (Airflow Executors (Sequential, Local, Celery)). Logs capture job submission, polling updates, and completion details, including the run ID (Task Logging and Monitoring). If do_xcom_push=True, it pushes the run ID to XCom for downstream use; otherwise, outputs are typically stored in Databricks or external storage like S3 (Airflow XComs: Task Communication). The Airflow UI updates to reflect the task’s status—yellow while polling, green upon success—offering a visual indicator of its progress (Airflow Graph View Explained). For more details, see the DatabricksSubmitRunOperator Docs.
Detailed Workflow
- Task Triggering: The Scheduler initiates the task when upstream dependencies are met.
- Databricks Connection: The operator connects using databricks_conn_id and DatabricksHook.
- Job Submission: It submits the run with the json configuration to Databricks.
- Polling: It polls Databricks every polling_period_seconds until completion or timeout_seconds elapses.
- Completion: Logs confirm success, optionally push the run ID to XCom, and the UI updates.
Additional Parameters
- timeout_seconds: Caps wait time for job completion.
- polling_period_seconds: Controls polling frequency.
Configuring DatabricksSubmitRunOperator in Apache Airflow
Configuring the DatabricksSubmitRunOperator requires setting up Airflow, establishing a Databricks connection, and creating a DAG with a job configuration. Below is a detailed guide with expanded instructions.
Step 1: Set Up Your Airflow Environment with Databricks Support
- Install Apache Airflow with Databricks Provider:
- Command: Open a terminal and execute python -m venv airflow_env && source airflow_env/bin/activate && pip install apache-airflow[databricks].
- Details: Creates a virtual environment named airflow_env, activates it (prompt shows (airflow_env)), and installs Airflow with the Databricks provider package via the [databricks] extra, including DatabricksSubmitRunOperator and DatabricksHook.
- Outcome: Airflow is ready to interact with Databricks.
2. Initialize Airflow:
- Command: Run airflow db init.
- Details: Sets up Airflow’s metadata database at ~/airflow/airflow.db and creates the dags folder.
3. Configure Databricks Connection:
- Via UI: Start the webserver (below), go to localhost:8080 > “Admin” > “Connections” > “+”:
- Conn ID: databricks_default.
- Conn Type: Databricks.
- Host: Your Databricks workspace URL (e.g., https://adb-1234567890123456.7.azuredatabricks.net).
- Token: Your Databricks personal access token (e.g., dapi123...).
- Save: Stores the connection securely.
- Via CLI: airflow connections add 'databricks_default' --conn-type 'databricks' --conn-host 'https://adb-1234567890123456.7.azuredatabricks.net' --conn-login 'token' --conn-password 'dapi123...'.
4. Start Airflow Services:
- Webserver: airflow webserver -p 8080.
- Scheduler: airflow scheduler.
Step 2: Create a DAG with DatabricksSubmitRunOperator
- Open Editor: Use a tool like VS Code.
- Write the DAG:
- Code:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime
default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": 10,
}
with DAG(
dag_id="databricks_submit_run_dag",
start_date=datetime(2025, 4, 1),
schedule_interval="@daily",
catchup=False,
default_args=default_args,
) as dag:
databricks_config = {
"new_cluster": {
"spark_version": "10.4.x-scala2.12",
"node_type_id": "Standard_D3_v2",
"num_workers": 2,
},
"notebook_task": {
"notebook_path": "/Users/myuser/transform_notebook",
"base_parameters": {"date": "{ { ds } }"},
},
}
databricks_task = DatabricksSubmitRunOperator(
task_id="databricks_task",
json=databricks_config,
databricks_conn_id="databricks_default",
timeout_seconds=1800,
do_xcom_push=True,
polling_period_seconds=60,
run_name="transform-run-{ { ds } }",
)
- Details:
- dag_id: Unique DAG identifier.
- start_date: Activation date.
- schedule_interval: Daily execution.
- catchup: Prevents backfills.
- task_id: Identifies the task as "databricks_task".
- json: Configures a new cluster and notebook task with dynamic date parameter.
- databricks_conn_id: Uses Databricks credentials.
- timeout_seconds: Limits wait to 30 minutes (1800 seconds).
- do_xcom_push: Pushes run ID to XCom.
- polling_period_seconds: Polls every 60 seconds.
- run_name: Names the run dynamically (e.g., "transform-run-2025-04-09").
- Save: Save as ~/airflow/dags/databricks_submit_run_dag.py.
Step 3: Test and Observe DatabricksSubmitRunOperator
- Trigger DAG: Run airflow dags trigger -e 2025-04-09 databricks_submit_run_dag.
- Details: Initiates the DAG for April 9, 2025.
2. Monitor UI: Open localhost:8080, click “databricks_submit_run_dag” > “Graph View”.
- Details: databricks_task turns yellow while polling, then green upon success.
3. Check Logs: Click databricks_task > “Log”.
- Details: Shows job submission (e.g., “Submitting run to Databricks”), polling (e.g., “Run state: RUNNING”), and success with run ID.
4. Verify Databricks: Use Databricks UI (Runs tab) or API (curl -H "Authorization: Bearer <token>" https://<databricks-host>/api/2.1/jobs/runs/list</databricks-host></token>) to confirm the run completed.
- Details: Ensures the notebook ran and outputs are stored (e.g., in DBFS or external storage).
5. CLI Check: Run airflow tasks states-for-dag-run databricks_submit_run_dag 2025-04-09.
- Details: Shows success for databricks_task.
Key Features of DatabricksSubmitRunOperator
The DatabricksSubmitRunOperator offers robust features for Databricks job execution, detailed below with examples.
Databricks Job Run Submission
- Explanation: This core feature submits a Databricks job run with a specified json configuration, managing execution on a cluster, as detailed in the DatabricksSubmitRunOperator Docs.
- Parameters:
- json: Run configuration.
- Example:
- Scenario: Processing ETL data ETL Pipelines with Airflow.
- Code: ```python process_etl = DatabricksSubmitRunOperator( task_id="process_etl", json={ "new_cluster": {"spark_version": "10.4.x-scala2.12", "node_type_id": "Standard_D3_v2", "num_workers": 2}, "spark_python_task": {"python_file": "dbfs:/FileStore/scripts/transform.py", "parameters": ["--date", "{ { ds } }"]}, }, databricks_conn_id="databricks_default", ) ```
- Context: Runs a Python script to transform daily data.
Databricks Connection Management
- Explanation: The operator manages Databricks connectivity via databricks_conn_id, using DatabricksHook to authenticate securely, centralizing credential configuration.
- Parameters:
- databricks_conn_id: Databricks connection ID.
- Example:
- Scenario: Validating CI/CD data CI/CD Pipelines with Airflow.
- Code: ```python validate_ci = DatabricksSubmitRunOperator( task_id="validate_ci", json={"new_cluster": {"spark_version": "10.4.x-scala2.12", "node_type_id": "Standard_D3_v2", "num_workers": 1}, "notebook_task": {"notebook_path": "/Users/myuser/validate"} }, databricks_conn_id="databricks_default", ) ```
- Context: Uses secure credentials to validate data with a notebook.
Polling and Timeout Control
- Explanation: The timeout_seconds and polling_period_seconds parameters control how the operator monitors job completion, balancing responsiveness and safety.
- Parameters:
- timeout_seconds: Max wait time.
- polling_period_seconds: Polling interval.
- Example:
- Scenario: Controlled run in a cloud-native workflow Cloud-Native Workflows with Airflow.
- Code: ```python process_cloud = DatabricksSubmitRunOperator( task_id="process_cloud", json={"new_cluster": {"spark_version": "10.4.x-scala2.12", "node_type_id": "Standard_D3_v2", "num_workers": 2}, "notebook_task": {"notebook_path": "/Users/myuser/process"} }, databricks_conn_id="databricks_default", timeout_seconds=3600, polling_period_seconds=120, ) ```
- Context: Polls every 2 minutes, timing out after 1 hour if incomplete.
XCom Integration
- Explanation: The do_xcom_push parameter enables pushing the run ID to XCom, facilitating downstream task coordination.
- Parameters:
- do_xcom_push: XCom push flag.
- Example:
- Scenario: Tracking run in an ETL job.
- Code: ```python track_run = DatabricksSubmitRunOperator( task_id="track_run", json={"new_cluster": {"spark_version": "10.4.x-scala2.12", "node_type_id": "Standard_D3_v2", "num_workers": 2}, "notebook_task": {"notebook_path": "/Users/myuser/etl"} }, databricks_conn_id="databricks_default", do_xcom_push=True, ) ```
- Context: Pushes the run ID for downstream use.
Best Practices for Using DatabricksSubmitRunOperator
- Test Config Locally: Validate json in Databricks UI before DAG use DAG Testing with Python.
- Secure Credentials: Store Databricks token in databricks_conn_id securely Airflow Performance Tuning.
- Set Timeouts: Use timeout_seconds to cap wait time Task Execution Timeout Handling.
- Monitor Runs: Check logs and Databricks UI for completion Airflow Graph View Explained.
- Optimize Clusters: Tune json cluster settings for cost/performance Airflow Performance Tuning.
- Organize DAGs: Use clear names in ~/airflow/dagsDAG File Structure Best Practices.
Frequently Asked Questions About DatabricksSubmitRunOperator
1. Why Isn’t My Job Starting?
Verify databricks_conn_id, json, and permissions—logs may show errors (Task Logging and Monitoring).
2. Can It Run Asynchronously?
Yes, but it waits by default—customize with do_not_wait in future versions per DatabricksSubmitRunOperator Docs (DatabricksSubmitRunOperator).
3. How Do I Retry Failures?
Set retries and retry_delay in default_args (Task Retries and Retry Delays).
4. Why Did It Timeout?
Check timeout_seconds—job may exceed limit; logs show polling (Task Failure Handling).
5. How Do I Debug?
Run airflow tasks test and check logs/Databricks UI (DAG Testing with Python).
6. Can It Span Multiple DAGs?
Yes, with TriggerDagRunOperator and XCom (Task Dependencies Across DAGs).
7. How Do I Optimize Costs?
Adjust json cluster size and runtime (Airflow Performance Tuning).
Conclusion
The DatabricksSubmitRunOperator empowers Airflow workflows with Databricks processing—build DAGs with Defining DAGs in Python, install via Installing Airflow (Local, Docker, Cloud), and optimize with Airflow Performance Tuning. Monitor via Monitoring Task Status in UI and explore more at Airflow Concepts: DAGs, Tasks, and Workflows!