Defining DAGs in Python
Apache Airflow is a leading open-source platform for orchestrating workflows, and its power hinges on defining Directed Acyclic Graphs (DAGs) in Python. These DAGs are the backbone of your workflows, specifying tasks, their dependencies, and schedules—all written as Python code you can run, test, and version. Whether you’re executing a simple script with BashOperator or a complex pipeline with Airflow with Apache Spark, defining DAGs in Python is your starting point. This guide, hosted on SparkCodeHub, explores how to define DAGs in Python—covering the essentials, step-by-step processes, and best practices. New to Airflow? Begin with Airflow Fundamentals, and pair this with Introduction to DAGs in Airflow for a solid foundation.
What Does Defining DAGs in Python Mean?
Defining DAGs in Python means writing a Python script that outlines your workflow as a Directed Acyclic Graph (DAG)—a structure with tasks (nodes) and dependencies (edges) that doesn’t loop back. In Airflow, you use the DAG class to create this script, setting a unique dag_id, a start_date, and a schedule_interval to tell Airflow what to run and when. Tasks—like fetching data or processing it—are defined with Operators (e.g., PythonOperator), and you link them with dependencies (e.g., task1 >> task2). These scripts go in the dags folder—typically ~/airflow/dags, organized via DAG File Structure Best Practices—where the Scheduler reads them (Airflow Architecture (Scheduler, Webserver, Executor)). It’s coding your workflow, giving Airflow a clear map to follow.
Think of it as scripting a recipe—list the steps (tasks), order them (dependencies), and set a timer (schedule)—all in Python, making it versionable and testable.
Why Define DAGs in Python?
Defining DAGs in Python is what sets Airflow apart—it’s the “workflow as code” approach. Unlike manual scheduling tools, Python lets you version your DAGs with Git, test them before running (DAG Testing with Python), and collaborate with teams. It integrates with Airflow’s Scheduler for automation (DAG Scheduling (Cron, Timetables)), the Executor for execution (Airflow Executors (Sequential, Local, Celery)), and the database for tracking (Airflow Metadata Database Setup). This code-driven method ensures precision—dependencies like task1 >> task2 (DAG Dependencies and Task Ordering) are explicit—and scalability, viewable in Airflow Web UI Overview). It’s flexible, repeatable, and powerful.
Anatomy of a DAG Definition
A DAG in Python has key pieces—let’s break them down.
Importing Required Modules
You start by importing Airflow’s DAG class and Operators—like BashOperator or PythonOperator—from the airflow package. Add datetime from Python’s standard library for dates—e.g., from airflow import DAG; from airflow.operators.bash import BashOperator; from datetime import datetime.
Defining the DAG Object
The DAG object is the core—set it with dag_id (e.g., “my_dag”), start_date (e.g., datetime(2025, 1, 1)), and schedule_interval (e.g., @daily). Use catchup=False to skip past runs—more in DAG Parameters and Defaults).
Adding Tasks
Tasks are defined with Operators inside a with DAG block—e.g., task1 = BashOperator(task_id="task1", bash_command="echo 'Hi'"). Each task needs a unique task_id—they’re the jobs Airflow runs.
Setting Dependencies
Link tasks with >>—e.g., task1 >> task2—to set order. Use lists for multiple dependencies—task1 >> [task2, task3]—detailed in DAG Dependencies and Task Ordering.
Creating a Simple DAG in Python
Let’s define a basic DAG to see it in action.
Step 1: Set Up Airflow Environment
- Install Airflow: Follow Installing Airflow (Local, Docker, Cloud)—open your terminal, type cd ~, press Enter, then python -m venv airflow_env, source airflow_env/bin/activate (Mac/Linux) or airflow_env\Scripts\activate (Windows), and pip install apache-airflow.
- Initialize the Database: Type airflow db init and press Enter—it creates ~/airflow/airflow.db and the dags folder.
- Start the Webserver: In one terminal, activate, type airflow webserver -p 8080, and press Enter—go to localhost:8080.
- Start the Scheduler: In another terminal, activate, type airflow scheduler, and press Enter—it scans ~/airflow/dags.
Step 2: Write the DAG Script
- Open Your Text Editor: Use Notepad (Windows), TextEdit (Mac), or VS Code.
- Write the DAG Code: Paste this:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="simple_dag",
start_date=datetime(2025, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
start = BashOperator(
task_id="start_task",
bash_command="echo 'Starting the workflow!'",
)
end = BashOperator(
task_id="end_task",
bash_command="echo 'Workflow complete!'",
)
start >> end
- Save the File: Save as simple_dag.py in ~/airflow/dags—e.g., /home/username/airflow/dags/simple_dag.py or C:\Users\YourUsername\airflow\dags\simple_dag.py. Ensure it’s .py (Windows: “Save As,” “All Files,” simple_dag.py).
Step 3: Verify and Run the DAG
- Check the UI: Open your browser, go to localhost:8080, wait 10-20 seconds, and see “simple_dag” listed under “DAGs.”
- Trigger the DAG: In your terminal, activate, type airflow dags trigger -e 2025-04-07 simple_dag, and press Enter—it runs start_task, then end_task.
- View Results: Refresh localhost:8080, click “simple_dag”—see green circles for both tasks, with logs in Task Logging and Monitoring).
This DAG echoes messages daily—your first Python-defined workflow is live!
Advanced DAG Definitions
Let’s level up with more complexity.
Adding Multiple Tasks
Add tasks for an ETL workflow:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="etl_dag",
start_date=datetime(2025, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
extract = BashOperator(
task_id="extract",
bash_command="echo 'Extracting data'",
)
transform = BashOperator(
task_id="transform",
bash_command="echo 'Transforming data'",
)
load = BashOperator(
task_id="load",
bash_command="echo 'Loading data'",
)
extract >> transform >> load
Save as etl_dag.py in ~/airflow/dags, trigger with airflow dags trigger -e 2025-04-07 etl_dag—it runs in sequence.
Using Python Functions
Use PythonOperator for custom logic:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data():
print("Processing data with Python!")
with DAG(
dag_id="python_dag",
start_date=datetime(2025, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
start = PythonOperator(
task_id="start",
python_callable=process_data,
)
Save, trigger—it prints your message via Python.
Setting Complex Dependencies
Link multiple tasks:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="complex_dag",
start_date=datetime(2025, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
extract = BashOperator(task_id="extract", bash_command="echo 'Extract'")
transform1 = BashOperator(task_id="transform1", bash_command="echo 'Transform 1'")
transform2 = BashOperator(task_id="transform2", bash_command="echo 'Transform 2'")
load = BashOperator(task_id="load", bash_command="echo 'Load'")
extract >> [transform1, transform2] >> load
Save, trigger—transform1 and transform2 run in parallel after extract, then load.
Best Practices for Defining DAGs
Keep dag_ids unique—e.g., “etl_daily” not “dag1.” Set start_date in the past (e.g., datetime(2025, 1, 1)), use catchup=False unless backfilling (Catchup and Backfill Scheduling). Define tasks clearly—unique task_ids, descriptive commands. Store in ~/airflow/dags—organize with DAG File Structure Best Practices). Test with airflow dags test (DAG Testing with Python).
FAQ: Common Questions About Defining DAGs in Python
Here are frequent questions about defining DAGs, with detailed answers from online sources.
1. Why doesn’t my DAG appear in the Airflow UI after saving it?
It’s likely not in ~/airflow/dags—type ls -a ~/airflow/dags (Mac/Linux) or dir %userprofile%\airflow\dags (Windows) to check. Look for syntax errors—run python ~/airflow/dags/my_dag.py in your terminal; fix any import or indentation issues. Ensure the Scheduler’s running—type airflow scheduler if not (Airflow CLI: Overview and Usage). Wait 10-20 seconds—tweak dag_dir_list_interval in Airflow Configuration Options).
2. How do I set a DAG to run only once a day?
Use schedule_interval="@daily" in your DAG—it runs at midnight daily from start_date. For a specific time, use cron—e.g., schedule_interval="0 9 * * *" for 9 AM—see DAG Scheduling (Cron, Timetables).
3. What’s the difference between start_date and the date I pass to airflow dags trigger?
start_date (e.g., datetime(2025, 1, 1)) is the DAG’s baseline—Airflow calculates scheduled runs from there. The -e date in airflow dags trigger -e 2025-04-07 my_dag is the execution date for a manual run—it overrides the schedule for that instance (Triggering DAGs via UI).
4. Can I define a DAG with tasks that run in parallel?
Yes—use lists for independent tasks—e.g., task1 >> [task2, task3]—they run together if your Executor allows (LocalExecutor or CeleryExecutor, not SequentialExecutor) (Airflow Executors (Sequential, Local, Celery)). Tune with Task Concurrency and Parallelism.
5. How do I debug a DAG if it’s not working as expected?
Test it—type airflow dags test my_dag 2025-04-07 to simulate the full DAG, or airflow tasks test my_dag task1 2025-04-07 for one task—see output without database impact (DAG Testing with Python). Check logs in ~/airflow/logs (Task Logging and Monitoring).
6. What happens if I forget to set dependencies between tasks?
Tasks without dependencies (e.g., no task1 >> task2) run in parallel if the Executor supports it—otherwise, order’s undefined. Always set >> for sequence—e.g., extract >> transform—to ensure logic (DAG Dependencies and Task Ordering).
7. Can I use Python variables or loops to define tasks in a DAG?
Yes—use Python logic outside the with DAG block to generate tasks—e.g., tasks = [BashOperator(task_id=f"task_{i}", bash_command=f"echo {i}") for i in range(3)], then tasks[0] >> tasks[1] >> tasks[2]. Inside with, define statically—dynamic generation is advanced (Dynamic DAG Generation).
Conclusion
Defining DAGs in Python is your entry to Airflow’s workflow orchestration—craft tasks, set dependencies, and schedule with precision. Start with Installing Airflow (Local, Docker, Cloud), define DAGs here, and optimize with Airflow Performance Tuning. Monitor in Monitoring Task Status in UI) and explore more with Airflow Concepts: DAGs, Tasks, and Workflows!