Problem
When trying to perform self-joins, you encounter an issue with the maximum message size for Spark Connect. You receive the following error message.
SparkConnectGrpcException <MultiThreadedRendezvous of RPC that ended with:
status=StatusCode.RESOURCE_EXHAUSTED
details= “Sent message larger than max (<size-of-your-message> vs <maximum-message-size-allowed>)”...
Cause
Your data is locally stored as a local_relation
. Data stored this way is duplicated in the planning phase of a query during self-joins. The query plan becomes too large from the duplication, reaching the maximum message size.
When you then perform a union including a large number of self-joins, the command fails with the message RESOURCE_EXHAUSTED
, indicating that the sent message is larger than the maximum allowed size.
Solution
Increase the message size limit from the default of 64 MB by changing the value of the spark.sql.session.localRelationCacheThreshold
configuration. You can start by trying twice the default, or 128MB. Experiment with increasing further if the issue persists.
Alternatively, take advantage of temporary (temp) views. Using temp views in the intermediary steps caches the table and uses a cached_relation
instead of a local_relation
. Cached relations do not have a maximum message size, allowing you to avoid a message size limit.
Example
The following code demonstrates how to use temp views. Write df_union
as a temp view and then read it on every step of the loop to ensure the message being sent uses a cached relation.
# dfs is a list of dataframes to aggregate with union commands. df_union stores the aggregation, and starts with the first dataframe in the list.
df_union = dfs[0]
#loop through all other dataframes in the list performing unions
for df in dfs[1:]:
df_union = df_union.union(df)
#create the temp view
df_union.createOrReplaceTempView("df_union")
#make it so df_union now have data coming from the temp view and can take advantage of the cached_relation
df_union = spark.sql("SELECT * FROM df_union")