Testing Apache Airflow DAGs: A Modular Approach
Introduction
Apache Airflow is a powerful workflow automation tool, but testing DAGs can be challenging due to their dependency on the Airflow scheduler and execution environment. In this post, we explore a modular approach that enables easier testing by extracting logic into separate Python functions, which can be tested independently.
Methods for Testing Airflow DAGs
1. Using airflow dags test
Airflow provides a built-in command to test DAG execution for a specific date without scheduling it:
airflow dags test whale_alert 2025-04-05
Pros: Quick way to verify DAG execution. Cons: Does not provide automated unit testing.
2. Extracting Functions for Unit Testing
A better approach is to move the data extraction and transformation logic to a separate module (whale_alert_utils.py
). This allows independent testing without needing to run Airflow.
New Directory Structure
dags/
├── whale_alert.py # Main DAG definition
├── whale_alert_utils.py # Extracted functions for processing
tests/
├── test_whale_alert.py # Unit tests
whale_alert_utils.py
– Modularizing Functions
import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
def extract_data():
"""Extracts data from Whale Alert website."""
url = "https://whale-alert.io/whales.html"
soup = BeautifulSoup(requests.get(url).content, "html.parser")
table_rows = soup.select("table.table tbody tr")
data = {
"datetime_utc": [datetime.utcnow()] * len(table_rows),
"crypto": [row.find("td").text.strip() for row in table_rows],
"known": [row.find_all("td")[1].text for row in table_rows],
"unknown": [row.find_all("td")[2].text for row in table_rows]
}
return pd.DataFrame(data)
def transform_data(df):
"""Cleans and transforms extracted data."""
for col in ['crypto', 'known', 'unknown']:
df[col] = df[col].astype(str).str.replace(r'[$,]', '', regex=True).replace('-', '0').astype(float)
return df
whale_alert.py
– DAG Using Modular Functions
from airflow.decorators import dag, task
from whale_alert_utils import extract_data, transform_data
from datetime import datetime
import pandas as pd
@dag(schedule="@daily", start_date=datetime(2022, 1, 1), catchup=False)
def whale_alert():
@task()
def extract():
df = extract_data()
return df.to_dict()
@task()
def transform(raw_data):
df = pd.DataFrame(raw_data)
transformed_df = transform_data(df)
return transformed_df.to_dict()
transform(extract())
whale_alert()
3. Writing Unit Tests with pytest
Once we modularize the logic, we can write unit tests to verify functionality independently from Airflow.
test_whale_alert.py
import pytest
import pandas as pd
from whale_alert_utils import extract_data, transform_data
def test_extract_data():
df = extract_data()
assert isinstance(df, pd.DataFrame)
assert "crypto" in df.columns
assert "known" in df.columns
assert "unknown" in df.columns
assert not df.empty
def test_transform_data():
data = {
"datetime_utc": ["2025-04-05"],
"crypto": ["$50,000,000"],
"known": ["$100,000,000"],
"unknown": ["-"]
}
df = pd.DataFrame(data)
transformed_df = transform_data(df)
assert transformed_df["crypto"].iloc[0] == 50000000.0
assert transformed_df["known"].iloc[0] == 100000000.0
assert transformed_df["unknown"].iloc[0] == 0.0
Run the tests with:
pytest tests/test_whale_alert.py
Benefits of This Approach
✅ Modular Code: Separates Airflow-specific logic from core data processing. ✅ Easier Debugging: Tests can be run without needing to spin up Airflow. ✅ Reusability: The whale_alert_utils.py
module can be used in other applications.
Conclusion
Testing Airflow DAGs effectively requires separating core logic from the DAG definition. By moving the extraction and transformation logic to a separate module, we can write unit tests with pytest
, making our DAGs more reliable and maintainable. Happy testing! 🚀