Geek Logbook

Tech sea log book

Ranking Products Using Window Functions in PySpark

Introduction Window functions are powerful tools in SQL and PySpark that allow us to perform calculations across a subset of rows related to the current row. In this blog post, we’ll explore how to use window functions in PySpark to rank products based on their sales and filter those with sales above the category average.

Setting Up PySpark First, we need to initialize a Spark session and create a DataFrame with sample product sales data:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("WindowRankingSQL").getOrCreate()

# Create sample DataFrame
data = [("Product A", "Electronics", 500),
        ("Product B", "Electronics", 300),
        ("Product C", "Electronics", 150),
        ("Product D", "Home", 450),
        ("Product E", "Home", 350),
        ("Product F", "Home", 200)]

df = spark.createDataFrame(data, ["Product", "Category", "Sales"])

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("products")

Using Window Functions for Ranking To rank products globally and within each category, we use the RANK() function along with window partitioning:

SELECT
    Product,
    Category,
    Sales,
    RANK() OVER (ORDER BY Sales DESC) AS `Global Ranking`,
    RANK() OVER (PARTITION BY Category ORDER BY Sales DESC) AS `Category Ranking`,
    AVG(Sales) OVER (PARTITION BY Category) AS `Average Sales per Category`
FROM products

Executing this SQL query in PySpark:

result = spark.sql("""
SELECT
    Product,
    Category,
    Sales,
    RANK() OVER (ORDER BY Sales DESC) AS `Global Ranking`,
    RANK() OVER (PARTITION BY Category ORDER BY Sales DESC) AS `Category Ranking`,
    AVG(Sales) OVER (PARTITION BY Category) AS `Average Sales per Category`
FROM products
""")

result.show()

Filtering Products Above the Category Average Now, let’s retrieve products whose sales are above their category average:

SELECT
    Product,
    Category,
    Sales
FROM
    products
WHERE
    Sales > (SELECT AVG(Sales) FROM products WHERE Category = products.Category)

Executing this query in PySpark:

above_avg_result = spark.sql("""
SELECT
    Product,
    Category,
    Sales
FROM
    products
WHERE
    Sales > (SELECT AVG(Sales) FROM products WHERE Category = products.Category)
""")

above_avg_result.show()

Conclusion Window functions in PySpark allow us to efficiently perform complex calculations such as ranking and filtering based on aggregate values. This example demonstrates how to use RANK() and AVG() to analyze product sales data effectively. By leveraging SQL queries within PySpark, we can process large datasets with ease and flexibility.

Would you like to see more PySpark tutorials? Let us know in the comments!

Tags: