Problem
Your Apache Spark job involving a join between two tables is running longer than expected. You notice that Spark is using a BroadcastNestedLoopJoin (BNLJ), which is computationally expensive.
The following example query can trigger a BNLJ.
%sql
SELECT * FROM Table1 t1
JOIN Table2 t2
ON (
(t1.colA = t2.colX OR t1.colA = t2.colY)
AND
(NOT (t1.flag = 'Y') OR NOT (t2.flag = 'Y'))
)
The following is the SQL plan for the example query.
+- CollectLimit (10)
+- * BroadcastNestedLoopJoin Inner BuildRight (9)
:- * Project (4)
: +- * Filter (3)
: +- * ColumnarToRow (2)
: +- Scan parquet table1 (1)
+- ShuffleQueryStage (8)
+- Exchange (7)
+- * ColumnarToRow (6)
+- Scan parquet table2 (5)
Cause
Your query has non-equi or complex join conditions, such as OR
clauses or CASE WHEN
logic within the join clause. In such cases, Spark may default to using a BNLJ. This join strategy is computationally expensive because each row from one dataset is compared with every row from the other, resulting in significant performance overhead.
Solution
Rewrite your join condition to ensure it's based on equi-join logic. This basis allows Spark to choose a more efficient join strategy, such as sort-merge join or broadcast hash join.
The following code is the previous example rewritten to avoid the BNLJ. It splits the OR
conditions into separate queries with UNION
.
%sql
SELECT * FROM Table1 t1
JOIN Table2 t2
ON t1.colA = t2.colX
WHERE NOT (t1.flag = 'Y' AND t2.flag = 'Y')
UNION
SELECT * FROM Table1 t1
JOIN Table2 t2
ON t1.colA = t2.colY
WHERE NOT (t1.flag = 'Y' AND t2.flag = 'Y')
The following is the SQL execution plan for the modified query. Spark has chosen BroadcastHashJoin as the join strategy.
ResultQueryStage (27),
+- * HashAggregate (26)
+- AQEShuffleRead (25)
+- ShuffleQueryStage (24)
+- Exchange (23)
+- * HashAggregate (22)
+- Union (21)
:- * BroadcastHashJoin Inner BuildRight (10)
: :- * Project (4)
: : +- * Filter (3)
: : +- * ColumnarToRow (2)
: : +- Scan parquet table1 (1)
: +- ShuffleQueryStage (9)
: +- Exchange (8)
: +- * Filter (7)
: +- * ColumnarToRow (6)
: +- Scan parquet table2 (5)
+- * BroadcastHashJoin Inner BuildRight (20)
:- * Project (14)
: +- * Filter (13)
: +- * ColumnarToRow (12)
: +- Scan parquet table1 (11)
+- ShuffleQueryStage (19),
+- Exchange (18)
+- * Filter (17)
+- * ColumnarToRow (16)
+- Scan parquet table2 (15)