Geek Logbook

Tech sea log book

Automating Data Extraction with Airflow, BeautifulSoup, and MinIO

In the data engineering ecosystem, a common task is to automate the extraction of data from external sources, perform minimal processing, and store it in a data lake for further analysis. In this post, I will demonstrate how to build an Apache Airflow DAG that fetches public information from Whale Alert, transforms it into a CSV file, and stores it in MinIO.


Architecture

The designed workflow is straightforward:

  1. Airflow DAG: schedules and orchestrates the daily execution.
  2. Requests + BeautifulSoup: scrapes and parses the HTML table with cryptocurrency data.
  3. Pandas: structures the extracted data into a DataFrame and exports it as CSV.
  4. Boto3 (S3 client): uploads the resulting file to the data lake in MinIO.

DAG Code

from airflow.decorators import dag, task
from datetime import datetime, timezone
from bs4 import BeautifulSoup
import pandas as pd
import requests
import boto3

ENDPOINT_URL = "http://data-lake:9000"
BUCKET_NAME = "whale-alert"
ACCESS_KEY = "minio"
SECRET_KEY = "minio123"

@dag(
    schedule="@daily",            # Run once per day
    start_date=datetime(2022, 1, 1),
    catchup=False,                # Avoid backfilling past dates
    default_args={"retries": 1},
    tags=['whale_alert']
)
def whale_alert():

    @task()
    def extract_and_upload():
        url = "https://whale-alert.io/whales.html"
        timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
        object_name = f"bronze/{timestamp}.csv"

        try:
            # Extract HTML table
            soup = BeautifulSoup(requests.get(url).content, "html.parser")
            table_rows = soup.select("table.table tbody tr")

            # Structure data
            data = {
                "datetime_utc": [datetime.now(timezone.utc)] * len(table_rows),
                "crypto": [row.find("td").text.strip() for row in table_rows],
                "known": [row.find_all("td")[1].text for row in table_rows],
                "unknown": [row.find_all("td")[2].text for row in table_rows]
            }

            # Convert to CSV
            csv_data = pd.DataFrame(data).to_csv(index=False)

            # Upload to MinIO
            s3_client = boto3.client(
                's3',
                endpoint_url=ENDPOINT_URL,
                aws_access_key_id=ACCESS_KEY,
                aws_secret_access_key=SECRET_KEY
            )
            s3_client.put_object(Bucket=BUCKET_NAME, Key=object_name, Body=csv_data)
            return object_name

        except Exception as e:
            return f"Extraction failed: {e}"

    extract_and_upload()

dag = whale_alert()

Results in Airflow

Once the DAG is enabled in the Airflow UI:

  • When triggered manually, it generates a file under bronze/<timestamp>.csv in the whale-alert bucket on MinIO.
  • With schedule="@daily", Airflow will also create scheduled runs (one per day). If you only want manual execution, set schedule=None.

Key Considerations

  • Idempotency: using execution_date instead of now() allows deterministic file names per day and avoids duplicates between retries.
  • Scalability: this workflow can be integrated into a medallion architecture (bronze, silver, gold) for incremental processing.
  • Monitoring: Airflow provides logging, retries, and alerting capabilities out of the box.

Conclusion

With a small amount of code, it is possible to automate external data ingestion into a data lake using Airflow, Requests, BeautifulSoup, Pandas, and MinIO. This approach is widely applicable to other web scraping use cases and enables reproducible and scalable data pipelines.