DolphinDBOperator in Apache Airflow: A Comprehensive Guide

Apache Airflow is a leading open-source platform for orchestrating workflows, enabling users to define, schedule, and monitor tasks through Python scripts known as Directed Acyclic Graphs (DAGs). Within its versatile ecosystem, the DolphinDBOperator emerges as a specialized tool designed to integrate Airflow with DolphinDB, a high-performance distributed time-series database optimized for real-time analytics and data processing. This operator facilitates seamless execution of DolphinDB scripts and operations, enhancing Airflow’s capability to manage data workflows that require rapid querying and analytics on large-scale time-series data. Whether you’re processing real-time financial data in ETL Pipelines with Airflow, validating data consistency in CI/CD Pipelines with Airflow, or managing real-time analytics in Cloud-Native Workflows with Airflow, the DolphinDBOperator bridges Airflow’s orchestration strengths with DolphinDB’s advanced analytical capabilities. Hosted on SparkCodeHub, this guide offers an in-depth exploration of the DolphinDBOperator in Apache Airflow, covering its purpose, operational mechanics, configuration process, key features, and best practices. Expect detailed step-by-step instructions, practical examples enriched with context, and a comprehensive FAQ section addressing common questions. For those new to Airflow, foundational insights can be gained from Airflow Fundamentals and Defining DAGs in Python, with additional details available at DolphinDBOperator.


Understanding DolphinDBOperator in Apache Airflow

The DolphinDBOperator, part of the airflow_provider_dolphindb.operators.dolphindb module within the airflow-provider-dolphindb package, is a tailored operator crafted to execute DolphinDB scripts (.dos files) or operations within an Airflow DAG. DolphinDB is a high-performance database and computing platform designed for real-time analytics, offering robust support for time-series data, distributed computing, and complex analytical queries. The DolphinDBOperator leverages this capability by enabling Airflow tasks to interact directly with DolphinDB, executing scripts or commands to perform tasks such as data ingestion, transformation, or querying, and integrating these operations into your DAGs—the Python scripts that define your workflow logic (Introduction to DAGs in Airflow).

This operator establishes a connection to a DolphinDB server using a configuration ID stored in Airflow’s connection management system, authenticating with credentials such as a username, password, and the server’s host and port (e.g., 127.0.0.1:8848). It then submits a DolphinDB script or operation—specified by a file path or inline command—and processes the response, which can be used for further tasks within the workflow. Within Airflow’s architecture, the Scheduler determines when these tasks execute—perhaps every minute for real-time data updates or daily for batch processing (DAG Scheduling (Cron, Timetables)). The Executor—typically the LocalExecutor in simpler setups—manages task execution on the Airflow host machine (Airflow Architecture (Scheduler, Webserver, Executor)). Task states—queued, running, success, or failed—are tracked meticulously through task instances (Task Instances and States). Logs capture every interaction with DolphinDB, from connection establishment to script execution output, providing a detailed record for troubleshooting or validation (Task Logging and Monitoring). The Airflow web interface visualizes this process, with tools like Graph View showing task nodes transitioning to green upon successful script execution, offering real-time insight into your workflow’s progress (Airflow Graph View Explained).

Key Parameters Explained with Depth

  • task_id: A string such as "execute_dolphindb_script" that uniquely identifies the task within your DAG. This identifier is essential, appearing in logs, the UI, and dependency definitions, acting as a clear label for tracking this specific DolphinDB operation throughout your workflow.
  • dolphindb_conn_id: The Airflow connection ID, like "dolphindb_default", that links to your DolphinDB server configuration—typically including the host and port (e.g., 127.0.0.1:8848), username, and password stored in Airflow’s connection settings. This parameter authenticates the operator with DolphinDB, serving as the entry point for script execution.
  • file_path: An optional string—e.g., "/scripts/process_data.dos"—specifying the path to a DolphinDB script file (.dos) to execute. This allows complex operations to be defined externally and executed by the operator.
  • sql: An optional string—e.g., "select avg(price) from trades where date = '2025-04-09'"—defining an inline DolphinDB SQL query to execute instead of a script file, offering flexibility for simpler operations.
  • database: An optional string—e.g., "trade_db"—specifying the DolphinDB database where the script or query operates, ensuring the operation targets the correct database context.

Purpose of DolphinDBOperator

The DolphinDBOperator’s primary purpose is to integrate DolphinDB’s high-performance data processing and real-time analytics capabilities into Airflow workflows, enabling tasks to execute DolphinDB scripts or SQL queries directly within your orchestration pipeline. It connects to a DolphinDB server, submits the specified script or query to the designated database, and ensures these operations align with your broader workflow objectives. In ETL Pipelines with Airflow, it’s ideal for executing scripts to transform time-series data—e.g., aggregating stock trades for analysis. For CI/CD Pipelines with Airflow, it can validate data processing outputs against expected results stored in DolphinDB. In Cloud-Native Workflows with Airflow, it supports real-time analytics by querying DolphinDB for up-to-date metrics.

The Scheduler ensures timely execution—perhaps every 5 minutes to process streaming data (DAG Scheduling (Cron, Timetables)). Retries manage transient DolphinDB issues—like server timeouts—with configurable attempts and delays (Task Retries and Retry Delays). Dependencies integrate it into larger pipelines, ensuring it runs after data ingestion or before downstream analytics tasks (Task Dependencies). This makes the DolphinDBOperator a key enabler for orchestrating DolphinDB-driven data workflows in Airflow.

Why It’s Essential

  • Real-Time Analytics: Connects Airflow to DolphinDB for high-performance, real-time data processing.
  • Script Flexibility: Executes both inline SQL and external scripts, adapting to diverse needs.
  • Workflow Integration: Aligns DolphinDB operations with Airflow’s scheduling and monitoring framework.

How DolphinDBOperator Works in Airflow

The DolphinDBOperator operates by establishing a connection to a DolphinDB server and executing specified scripts or queries within an Airflow DAG, serving as a conduit between Airflow’s orchestration and DolphinDB’s analytical capabilities. When triggered—say, by a daily schedule_interval at 7 AM—it uses the dolphindb_conn_id to authenticate with the DolphinDB server (e.g., 127.0.0.1:8848), leveraging credentials to establish a session. It then submits either a script via file_path—e.g., a .dos file containing complex analytics logic—or an inline sql query—e.g., "select * from trades where volume > 1000"—to the specified database, processes the response, and completes the task. The Scheduler queues the task based on the DAG’s timing (DAG Serialization in Airflow), and the Executor—typically LocalExecutor—runs it (Airflow Executors (Sequential, Local, Celery)). Execution details or errors are logged for review (Task Logging and Monitoring), and the UI updates task status, showing success with a green node (Airflow Graph View Explained).

Step-by-Step Mechanics

  1. Trigger: Scheduler initiates the task per the schedule_interval or dependency.
  2. Connection: Uses dolphindb_conn_id to authenticate with the DolphinDB server.
  3. Execution: Submits the file_path script or sql query to the database.
  4. Completion: Logs the outcome and updates the UI with the task’s status.

Configuring DolphinDBOperator in Apache Airflow

Setting up the DolphinDBOperator involves preparing your environment, configuring a DolphinDB connection in Airflow, and defining a DAG. Here’s a detailed guide.

Step 1: Set Up Your Airflow Environment with DolphinDB Support

Begin by creating a virtual environment—open a terminal, navigate with cd ~, and run python -m venv airflow_env. Activate it: source airflow_env/bin/activate (Linux/Mac) or airflow_env\Scripts\activate (Windows). Install Airflow and the DolphinDB provider: pip install apache-airflow airflow-provider-dolphindb—this includes the airflow-provider-dolphindb package with DolphinDBOperator. Initialize Airflow with airflow db init, creating ~/airflow. Start a DolphinDB server locally (e.g., using Docker: docker run -d -p 8848:8848 4pdosc/openmldb:0.8.3 with DolphinDB compatibility) or use an existing instance. Configure the connection in Airflow’s UI at localhost:8080 under “Admin” > “Connections”:

  • Conn ID: dolphindb_default
  • Conn Type: DolphinDB
  • Host: DolphinDB server host (e.g., 127.0.0.1)
  • Port: 8848
  • Login: Your DolphinDB username (e.g., admin)
  • Password: Your DolphinDB password (e.g., 123456)

Save it. Or use CLI: airflow connections add 'dolphindb_default' --conn-type 'dolphindb' --conn-host '127.0.0.1' --conn-port '8848' --conn-login 'admin' --conn-password '123456'. Launch services: airflow webserver -p 8080 and airflow scheduler in separate terminals.

Step 2: Create a DAG with DolphinDBOperator

In a text editor, write:

from airflow import DAG
from airflow_provider_dolphindb.operators.dolphindb import DolphinDBOperator
from datetime import datetime

default_args = {
    "retries": 2,
    "retry_delay": 30,
}

with DAG(
    dag_id="dolphindb_dag",
    start_date=datetime(2025, 4, 1),
    schedule_interval="@daily",
    catchup=False,
    default_args=default_args,
) as dag:
    dolphindb_task = DolphinDBOperator(
        task_id="process_trades",
        dolphindb_conn_id="dolphindb_default",
        file_path="/scripts/process_trades.dos",
        database="trade_db",
    )
  • dag_id: "dolphindb_dag" uniquely identifies the DAG.
  • start_date: datetime(2025, 4, 1) sets the activation date.
  • schedule_interval: "@daily" runs it daily.
  • catchup: False prevents backfilling.
  • default_args: retries=2, retry_delay=30 for resilience.
  • task_id: "process_trades" names the task.
  • dolphindb_conn_id: "dolphindb_default" links to DolphinDB.
  • file_path: Specifies a .dos script for execution.
  • database: "trade_db" targets the database.

Create a sample process_trades.dos script in /scripts/:

t = loadTable("trade_db", "trades")
select avg(price) as avg_price from t where date = '2025-04-09'

Save the DAG as ~/airflow/dags/dolphindb_dag.py.

Step 3: Test and Observe DolphinDBOperator

Trigger with airflow dags trigger -e 2025-04-09 dolphindb_dag. Visit localhost:8080, click “dolphindb_dag”, and watch process_trades turn green in Graph View. Check logs for “Executing DolphinDB script: /scripts/process_trades.dos” and execution details—e.g., script output. Verify in DolphinDB with a query like curl -X POST http://127.0.0.1:8848 -d '{"sql":"select * from trades limit 10"}'—expect processed data. Confirm state with airflow tasks states-for-dag-run dolphindb_dag 2025-04-09.


Key Features of DolphinDBOperator

The DolphinDBOperator offers powerful features for DolphinDB integration in Airflow, each detailed with examples.

DolphinDB Script Execution

This feature enables execution of DolphinDB scripts via the file_path parameter, connecting to DolphinDB and running .dos files for complex data processing or analytics.

Example in Action

In ETL Pipelines with Airflow:

etl_task = DolphinDBOperator(
    task_id="aggregate_sales",
    dolphindb_conn_id="dolphindb_default",
    file_path="/scripts/aggregate_sales.dos",
    database="sales_db",
)

With aggregate_sales.dos:

t = loadTable("sales_db", "sales")
select region, sum(amount) as total_sales from t group by region

This aggregates sales data. Logs show “Executing script: /scripts/aggregate_sales.dos” and results, updating sales_db—key for ETL workflows.

Inline SQL Query Support

The sql parameter allows inline DolphinDB SQL queries—e.g., "select * from trades"—offering flexibility for simpler operations without external scripts.

Example in Action

For CI/CD Pipelines with Airflow:

ci_task = DolphinDBOperator(
    task_id="validate_trades",
    dolphindb_conn_id="dolphindb_default",
    sql="select count(*) as trade_count from trades where date = '2025-04-09'",
    database="trade_db",
)

This counts trades for validation. Logs confirm “Executing SQL: select count(*)...”, ensuring CI/CD data consistency with direct querying.

Database Targeting

The database parameter specifies the target DolphinDB database—e.g., "trade_db"—ensuring operations occur within the correct context.

Example in Action

In Cloud-Native Workflows with Airflow:

cloud_task = DolphinDBOperator(
    task_id="load_cloud_metrics",
    dolphindb_conn_id="dolphindb_default",
    sql="loadText('/data/metrics.csv', schema=table(1:0, `time`value, [TIMESTAMP, DOUBLE])) into table metrics",
    database="metrics_db",
)

This loads metrics into metrics_db. Logs show “Loading into metrics_db”, supporting cloud-native analytics with precise targeting.

Robust Error Handling

Inherited from Airflow, retries and retry_delay manage transient DolphinDB failures—like server timeouts—with logs tracking attempts, ensuring reliability.

Example in Action

For a resilient pipeline:

default_args = {
    "retries": 3,
    "retry_delay": 60,
}

robust_task = DolphinDBOperator(
    task_id="robust_analytics",
    dolphindb_conn_id="dolphindb_default",
    file_path="/scripts/run_analytics.dos",
    database="analytics_db",
)

If the server is unavailable, it retries three times, waiting 60 seconds—logs might show “Retry 1: timeout” then “Retry 2: success”, ensuring analytics complete.


Best Practices for Using DolphinDBOperator


Frequently Asked Questions About DolphinDBOperator

1. Why Isn’t My Task Connecting to DolphinDB?

Ensure dolphindb_conn_id points to a valid server—logs may show “Connection failed” if the host or port is incorrect or the server is down (Task Logging and Monitoring).

2. Can I Run Multiple Scripts in One Task?

No—each DolphinDBOperator instance runs one file_path or sql; use separate tasks for multiple operations (DolphinDBOperator).

3. How Do I Retry Failed DolphinDB Tasks?

Set retries=2, retry_delay=30 in default_args—handles server or network issues (Task Retries and Retry Delays).

4. Why Is My Script Not Executing?

Check file_path or sql syntax and database existence—logs may show “Invalid script” or “Database not found” (Task Failure Handling).

5. How Do I Debug Issues?

Run airflow tasks test dolphindb_dag process_trades 2025-04-09—see output live, check logs for errors (DAG Testing with Python).

6. Can It Work Across DAGs?

Yes—use TriggerDagRunOperator to chain DolphinDB tasks across DAGs (Task Dependencies Across DAGs).

7. How Do I Handle Slow Script Execution?

Set execution_timeout=timedelta(minutes=10) to cap runtime—prevents delays (Task Execution Timeout Handling).


Conclusion

The DolphinDBOperator seamlessly integrates DolphinDB’s high-performance analytics into Airflow workflows—craft DAGs with Defining DAGs in Python, install via Installing Airflow (Local, Docker, Cloud), and optimize with Airflow Performance Tuning. Monitor via Monitoring Task Status in UI and explore more with Airflow Concepts: DAGs, Tasks, and Workflows.