RESOURCES_EXHAUSTED error message when trying to perform self-joins with Spark Connect

Increase the max message size using the spark.sql.session.localRelationCacheThreshold config or use temporary views.

Written by Lucas Ribeiro

Last published at: March 18th, 2025

Problem

When trying to perform self-joins, you encounter an issue with the maximum message size for Spark Connect. You receive the following error message.  

SparkConnectGrpcException <MultiThreadedRendezvous of RPC that ended with:
status=StatusCode.RESOURCE_EXHAUSTED
details= “Sent message larger than max (<size-of-your-message> vs <maximum-message-size-allowed>)”...

 

Cause

Your data is locally stored as a local_relation. Data stored this way is duplicated in the planning phase of a query during self-joins. The query plan becomes too large from the duplication, reaching the maximum message size.

 

When you then perform a union including a large number of self-joins, the command fails with the message RESOURCE_EXHAUSTED, indicating that the sent message is larger than the maximum allowed size.

 

Solution

Increase the message size limit from the default of 64 MB by changing the value of the spark.sql.session.localRelationCacheThreshold configuration. You can start by trying twice the default, or 128MB. Experiment with increasing further if the issue persists.

 

Alternatively, take advantage of temporary (temp) views. Using temp views in the intermediary steps caches the table and uses a cached_relation instead of a local_relation. Cached relations do not have a maximum message size, allowing you to avoid a message size limit. 

 

Example 

The following code demonstrates how to use temp views. Write df_union as a temp view and then read it on every step of the loop to ensure the message being sent uses a cached relation. 

# dfs is a list of dataframes to aggregate with union commands. df_union stores the aggregation, and starts with the first dataframe in the list.
df_union = dfs[0]

#loop through all other dataframes in the list performing unions
for df in dfs[1:]:
	df_union = df_union.union(df)
	#create the temp view
	df_union.createOrReplaceTempView("df_union")
	#make it so df_union now have data coming from the temp view and can take advantage of the cached_relation
	df_union = spark.sql("SELECT * FROM df_union")