from pyspark.sql.functions import broadcast
from pyspark.sql import SparkSession
Spark Joins
Spark has five distinct join strategies
* Broadcast hash join * Shuffle hash join * Shuffle sort merge join * Broadcast nested loop join * shuffle and replicated nested loop join or cartesian product join
Broadcast Hash Join
- It is employed when we want to join a small dataset with a large dataset
- The smaller dataset is broadcasted by driver to all executors
- Moving the larger dataset is avoided
- By default spark will use broadcast join if the smaller dataset is less than 10 MB
- configuration for broadcast join
spark.sql.autoBroadcastJoinThreshold
- We can either increase or decrease the threshold as required
= SparkSession.builder.appName('joins').getOrCreate() spark
23/05/06 17:39:52 WARN Utils: Your hostname, thulasiram resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface wlp0s20f3)
23/05/06 17:39:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/06 17:39:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
= spark.range(1, 10000)
largeDF = spark.range(1, 100) smallDF
= largeDF.join(smallDF,"id") joinedDF
='simple') joinedDF.explain(mode
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0L]
+- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, false
:- Range (1, 10000, step=1, splits=16)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=16]
+- Range (1, 100, step=1, splits=16)
='formatted') joinedDF.explain(mode
== Physical Plan ==
AdaptiveSparkPlan (6)
+- Project (5)
+- BroadcastHashJoin Inner BuildRight (4)
:- Range (1)
+- BroadcastExchange (3)
+- Range (2)
(1) Range
Output [1]: [id#0L]
Arguments: Range (1, 10000, step=1, splits=Some(16))
(2) Range
Output [1]: [id#2L]
Arguments: Range (1, 100, step=1, splits=Some(16))
(3) BroadcastExchange
Input [1]: [id#2L]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=16]
(4) BroadcastHashJoin
Left keys [1]: [id#0L]
Right keys [1]: [id#2L]
Join condition: None
(5) Project
Output [1]: [id#0L]
Input [2]: [id#0L, id#2L]
(6) AdaptiveSparkPlan
Output [1]: [id#0L]
Arguments: isFinalPlan=false
Shuffle Hash Join
- Moving the data with the same value of join key in the same executor node
- Followed by hash join
- Join key does not need to be sortable
- Expensive join that involves both shuffling and Hashing
Shuffle Sort Merge Join
- Shuffle sort-merge join involves shuffling of data to get the same join_key with the same worker
- sort-merge join operation at the partition level in the worker nodes
- The join keys need to be sortable
- sortMergeJoin is enabled via
spark.sql.join.preferSortMergeJoin
- Default Join in spark
Optimizing Shuffle Sort Merge Join
- Bucketing is an optimization technqiue
- Using Bucketing to determine data partitioning and avoid data shuffle
- use
bucketBy
method on the datasets so that keys are co-located - The number of buckets and the bucketing columns have to be the same across DataFrames participating in join
=spark.range(1, 100000)
largeDF = spark.range(1, 1000) smallDF
# Bucket the dataframe
12, "id")
(largeDF.write.bucketBy("id")
.sortBy("largeTable")
.saveAsTable(
)
12, "id")
(smallDF.write.bucketBy("id")
.sortBy("smallTable")) .saveAsTable(
[Stage 0:> (0 + 16) / 16]
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/06 18:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
# Read the bucketed tables
= spark.read.table("largeTable")
bucketedLargeDF = spark.read.table("smallTable") bucketedSmallDF
set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.
= bucketedLargeDF.join(bucketedSmallDF, "id") joinedDF
='extended') joinedDF.explain(mode
== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(id))
:- SubqueryAlias spark_catalog.default.largetable
: +- Relation default.largetable[id#12L] parquet
+- SubqueryAlias spark_catalog.default.smalltable
+- Relation default.smalltable[id#14L] parquet
== Analyzed Logical Plan ==
id: bigint
Project [id#12L]
+- Join Inner, (id#12L = id#14L)
:- SubqueryAlias spark_catalog.default.largetable
: +- Relation default.largetable[id#12L] parquet
+- SubqueryAlias spark_catalog.default.smalltable
+- Relation default.smalltable[id#14L] parquet
== Optimized Logical Plan ==
Project [id#12L]
+- Join Inner, (id#12L = id#14L)
:- Filter isnotnull(id#12L)
: +- Relation default.largetable[id#12L] parquet
+- Filter isnotnull(id#14L)
+- Relation default.smalltable[id#14L] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#12L]
+- SortMergeJoin [id#12L], [id#14L], Inner
:- Sort [id#12L ASC NULLS FIRST], false, 0
: +- Filter isnotnull(id#12L)
: +- FileScan parquet default.largetable[id#12L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#12L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/thulasiram/personal/data_engineering/spark/pyspark/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 12 out of 12
+- Sort [id#14L ASC NULLS FIRST], false, 0
+- Filter isnotnull(id#14L)
+- FileScan parquet default.smalltable[id#14L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/thulasiram/personal/data_engineering/spark/pyspark/spark-wa..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 12 out of 12
Cartesian Join
- The cartesian product of the two relations is calculated
Broadcast nested loop Join
- supports non-equi joins
Slow operation
for record_1 in relation_1: for record_2 in relation_2: # join condition is executed