Using broadcasting on Spark joins
Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast
operations to give each node a copy of the specified data.
A couple tips:
- Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
- On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
- If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.
The DataFrames flights_df
and airports_df
are available to you.
This exercise is part of the course
Cleaning Data with PySpark
Exercise instructions
- Import the
broadcast()
method frompyspark.sql.functions
. - Create a new DataFrame
broadcast_df
by joiningflights_df
withairports_df
, using the broadcasting. - Show the query plan and consider differences from the original.
Hands-on interactive exercise
Have a go at this exercise by completing this sample code.
# Import the broadcast method from pyspark.sql.functions
from ____ import ____
# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.____(____(airports_df), \
flights_df["Destination Airport"] == airports_df["IATA"] )
# Show the query plan and compare against the original
broadcast_df.____()