This article explains how to disable broadcast when the query plan has BroadcastNestedLoopJoin in the physical plan.
You expect the broadcast to stop after you disable the broadcast threshold, by setting spark.sql.autoBroadcastJoinThreshold to -1, but Apache Spark tries to broadcast the bigger table and fails with a broadcast error.
This behavior is NOT a bug, however it can be unexpected. We are going to review the expected behavior and provide a mitigation option for this issue.
Create tables
Start by creating two tables, one with null values table_withNull and the other without null values tblA_NoNull.
%sql sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull") sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")
Attempt to disable broadcast
We attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold for the query, which has a sub-query with an in clause.
%sql spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)
If you review the query plan, BroadcastNestedLoopJoin is the last possible fallback in this situation. It appears even after attempting to disable the broadcast.
== Physical Plan == *(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L))) :- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#2586] +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table.
Rewrite query using not exists instead of in
You can resolve the issue by rewriting the query with not exists instead of in.
%sql // It can be rewritten into a NOT EXISTS, which will become a regular join: sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)
By using not exists, the query runs with SortMergeJoin.
== Physical Plan == SortMergeJoin [id#2482L], [id#2483L], LeftAnti :- Sort [id#2482L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#2482L, 200), [id=#2653] : +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> +- Sort [id#2483L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#2483L, 200), [id=#2656] +- *(2) Project [id#2483L] +- *(2) Filter isnotnull(id#2483L) +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
Explanation
Spark doesn’t do this automatically, because Spark and SQL have slightly different semantics for null handling.
In SQL, not in means that if there is any null value in the not in values, the result is empty. This is why it can only be executed with BroadcastNestedLoopJoin. All not in values must be known in order to ensure there is no null value in the set.
Example notebook
This notebook has a complete example, showing why Spark does not automatically switch BroadcastNestedLoopJoin to SortMergeJoin.
Review the BroadcastNestedLoopJoin example notebook.