Geek Logbook

Tech sea log book

Testing Apache Airflow DAGs: A Modular Approach

Introduction

Apache Airflow is a powerful workflow automation tool, but testing DAGs can be challenging due to their dependency on the Airflow scheduler and execution environment. In this post, we explore a modular approach that enables easier testing by extracting logic into separate Python functions, which can be tested independently.

Methods for Testing Airflow DAGs

1. Using airflow dags test

Airflow provides a built-in command to test DAG execution for a specific date without scheduling it:

airflow dags test whale_alert 2025-04-05

Pros: Quick way to verify DAG execution. Cons: Does not provide automated unit testing.

2. Extracting Functions for Unit Testing

A better approach is to move the data extraction and transformation logic to a separate module (whale_alert_utils.py). This allows independent testing without needing to run Airflow.

New Directory Structure

dags/
 ├── whale_alert.py  # Main DAG definition
 ├── whale_alert_utils.py  # Extracted functions for processing
tests/
 ├── test_whale_alert.py  # Unit tests

whale_alert_utils.py – Modularizing Functions

import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime

def extract_data():
    """Extracts data from Whale Alert website."""
    url = "https://whale-alert.io/whales.html"
    soup = BeautifulSoup(requests.get(url).content, "html.parser")
    table_rows = soup.select("table.table tbody tr")

    data = {
        "datetime_utc": [datetime.utcnow()] * len(table_rows),
        "crypto": [row.find("td").text.strip() for row in table_rows],
        "known": [row.find_all("td")[1].text for row in table_rows],
        "unknown": [row.find_all("td")[2].text for row in table_rows]
    }
    return pd.DataFrame(data)

def transform_data(df):
    """Cleans and transforms extracted data."""
    for col in ['crypto', 'known', 'unknown']:
        df[col] = df[col].astype(str).str.replace(r'[$,]', '', regex=True).replace('-', '0').astype(float)
    return df

whale_alert.py – DAG Using Modular Functions

from airflow.decorators import dag, task
from whale_alert_utils import extract_data, transform_data
from datetime import datetime
import pandas as pd

@dag(schedule="@daily", start_date=datetime(2022, 1, 1), catchup=False)
def whale_alert():
    
    @task()
    def extract():
        df = extract_data()
        return df.to_dict()
    
    @task()
    def transform(raw_data):
        df = pd.DataFrame(raw_data)
        transformed_df = transform_data(df)
        return transformed_df.to_dict()

    transform(extract())

whale_alert()

3. Writing Unit Tests with pytest

Once we modularize the logic, we can write unit tests to verify functionality independently from Airflow.

test_whale_alert.py

import pytest
import pandas as pd
from whale_alert_utils import extract_data, transform_data

def test_extract_data():
    df = extract_data()
    assert isinstance(df, pd.DataFrame)
    assert "crypto" in df.columns
    assert "known" in df.columns
    assert "unknown" in df.columns
    assert not df.empty

def test_transform_data():
    data = {
        "datetime_utc": ["2025-04-05"],
        "crypto": ["$50,000,000"],
        "known": ["$100,000,000"],
        "unknown": ["-"]
    }
    df = pd.DataFrame(data)
    transformed_df = transform_data(df)

    assert transformed_df["crypto"].iloc[0] == 50000000.0
    assert transformed_df["known"].iloc[0] == 100000000.0
    assert transformed_df["unknown"].iloc[0] == 0.0

Run the tests with:

pytest tests/test_whale_alert.py

Benefits of This Approach

Modular Code: Separates Airflow-specific logic from core data processing. ✅ Easier Debugging: Tests can be run without needing to spin up Airflow. ✅ Reusability: The whale_alert_utils.py module can be used in other applications.

Conclusion

Testing Airflow DAGs effectively requires separating core logic from the DAG definition. By moving the extraction and transformation logic to a separate module, we can write unit tests with pytest, making our DAGs more reliable and maintainable. Happy testing! 🚀