Automating Data Extraction with Airflow, BeautifulSoup, and MinIO
In the data engineering ecosystem, a common task is to automate the extraction of data from external sources, perform minimal processing, and store it in a data lake for further analysis. In this post, I will demonstrate how to build an Apache Airflow DAG that fetches public information from Whale Alert, transforms it into a CSV file, and stores it in MinIO.
Architecture
The designed workflow is straightforward:
- Airflow DAG: schedules and orchestrates the daily execution.
- Requests + BeautifulSoup: scrapes and parses the HTML table with cryptocurrency data.
- Pandas: structures the extracted data into a
DataFrame
and exports it as CSV. - Boto3 (S3 client): uploads the resulting file to the data lake in MinIO.
DAG Code
from airflow.decorators import dag, task
from datetime import datetime, timezone
from bs4 import BeautifulSoup
import pandas as pd
import requests
import boto3
ENDPOINT_URL = "http://data-lake:9000"
BUCKET_NAME = "whale-alert"
ACCESS_KEY = "minio"
SECRET_KEY = "minio123"
@dag(
schedule="@daily", # Run once per day
start_date=datetime(2022, 1, 1),
catchup=False, # Avoid backfilling past dates
default_args={"retries": 1},
tags=['whale_alert']
)
def whale_alert():
@task()
def extract_and_upload():
url = "https://whale-alert.io/whales.html"
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
object_name = f"bronze/{timestamp}.csv"
try:
# Extract HTML table
soup = BeautifulSoup(requests.get(url).content, "html.parser")
table_rows = soup.select("table.table tbody tr")
# Structure data
data = {
"datetime_utc": [datetime.now(timezone.utc)] * len(table_rows),
"crypto": [row.find("td").text.strip() for row in table_rows],
"known": [row.find_all("td")[1].text for row in table_rows],
"unknown": [row.find_all("td")[2].text for row in table_rows]
}
# Convert to CSV
csv_data = pd.DataFrame(data).to_csv(index=False)
# Upload to MinIO
s3_client = boto3.client(
's3',
endpoint_url=ENDPOINT_URL,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)
s3_client.put_object(Bucket=BUCKET_NAME, Key=object_name, Body=csv_data)
return object_name
except Exception as e:
return f"Extraction failed: {e}"
extract_and_upload()
dag = whale_alert()
Results in Airflow
Once the DAG is enabled in the Airflow UI:
- When triggered manually, it generates a file under
bronze/<timestamp>.csv
in thewhale-alert
bucket on MinIO. - With
schedule="@daily"
, Airflow will also create scheduled runs (one per day). If you only want manual execution, setschedule=None
.
Key Considerations
- Idempotency: using
execution_date
instead ofnow()
allows deterministic file names per day and avoids duplicates between retries. - Scalability: this workflow can be integrated into a medallion architecture (bronze, silver, gold) for incremental processing.
- Monitoring: Airflow provides logging, retries, and alerting capabilities out of the box.
Conclusion
With a small amount of code, it is possible to automate external data ingestion into a data lake using Airflow, Requests, BeautifulSoup, Pandas, and MinIO. This approach is widely applicable to other web scraping use cases and enables reproducible and scalable data pipelines.