Aggregating and joining data efficiently
1. Aggregating and joining data efficiently
In the previous video, we prepared our data with a cleaning pipeline. Now we put it to work - aggregations, joins, and query optimization. Let's jump in.2. Three questions, one clean dataset
With our transactional data ready to analyze, three questions naturally arise: which categories drive the most revenue, who are the top customers, and how do we enrich those results with department context? Let's answer all three step by step.3. How groupBy() and agg() work
groupBy() partitions our DataFrame - every row with the same Category lands in the same group. agg() then applies our aggregate functions to each group in parallel. Both are lazy - nothing runs until an action like show() is called. And one agg() call with multiple functions reads the data only once, not once per function.4. Category revenue - code
The first question: which categories earn the most? We group df_valid by Category and pass three functions into one agg() call - total revenue, transaction count, and average transaction size - each rounded to two decimal places and named with .alias(). Ordering by total_revenue descending puts the top earners first.5. Category revenue - output
Clothing and Dining both clear three hundred million. The Unknown row is our null-Category transactions - the cleaning pipeline from the previous video still shines through. If we swap Category for Customer_ID, we immediately see which customers spend the most.6. Enriching with a dimension table
Revenue by category tells us what sells - but not which department owns it. Business teams think in Retail, Tech, Food, and Finance. That mapping lives in a small dimension table - five rows, one per category - and we'll join it to df_valid to add that context to every transaction.7. Standard left join
How do we attach those labels? We use .join(), specifying Category as the join key and left as the join type, so all rows from df_valid are preserved. Every transaction now has its Department.8. The hidden cost - shuffle
That join worked - but at a cost. To match rows by Category, Spark had to move data across the network, shipping rows between machines until matching keys ended up together. That data movement is the shuffle. With tens of millions of rows, it becomes the bottleneck.9. Spark UI
The Spark UI gives us a visual look at where the shuffle happened and how much data actually moved across the cluster - very useful when diagnosing performance on a real pipeline. For our purposes, though, the easiest way to check this is straight from the notebook.10. Reading the query plan with .explain()
We can inspect any DataFrame's execution plan using .explain(). The right branch shows how Spark handled the dimension table. The nodes are there, but no Arguments are logged for them - Spark used its default join strategy, no special instructions. That's what we're about to change.11. Broadcast join - the fix
To eliminate the shuffle, we use a broadcast join. Instead of moving rows across the network, Spark copies the small table to every executor upfront - broadcasting it. Each machine joins locally, no network transfer needed. We apply it by wrapping df_dim in F.broadcast(), keeping how='left' to preserve all our transaction rows.12. Before and after - verifying the plan
Now look at the same plan after adding broadcast. Something has changed - the dim table path now has explicit routing instructions. Instead of shuffling rows key by key, Spark sends the entire small table to every executor at once. The large table never moves at all. In production at fifty million rows, this one change can take a join from minutes to seconds.13. Let's practice!
We've covered groupBy aggregations for business metrics, enriching data with a dimension join, spotting shuffles in the query plan, and eliminating them with broadcast. Time to try it yourself.Create Your Free Account
or
By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.