Expensive transformation on DataFrame is recalculated even when cached

Understand how Apache Spark DataFrame caching works.

Written by jayant.sharma

Last published at: December 6th, 2024

Problem

Apache Spark DataFrame caching is used to prevent redundant computations during a complex ETL process. When you attempt to reuse a cached DataFrame, the actions trigger a full recomputation of the transformation instead of using the cached information.

Cause

There are several reasons why a cached DataFrame can be invalidated, leading to unexpected recalculations.

  • When you call df.cache() the DataFrame is not immediately cached. The caching occurs when an action like df.count()df.show(), etc. is executed. If an action is not triggered after calling df.cache(), the DataFrame is not cached.
  • When a DataFrame is cached on cluster A and the underlying data source is updated (INSERT/UPDATE/MERGE/DELETE) using the same cluster, the cache is invalidated and recomputed. If the underlying data source is updated using another cluster (cluster B), the cache on the original cluster (cluster A) is not affected. In this scenario, the cached DataFrame continues to be used, but it reflects the state of the data as it was before the external update.
  • By default, Spark caches data using the MEMORY_AND_DISK storage level. In this mode, if there is insufficient memory for live task computations, Spark automatically moves the cached data from memory to disk when the memory is full. But if the storage level is set to MEMORY_ONLY, cache data could be evicted when the memory is full. In this case, a lazy re-calculation of the cache data would occur on the next access.
  • Performing any non-idempotent transformations on a cached DataFrame may lead to recomputation. This is because the logical plan of DataFrame currently being executed may mismatch with the cached logical plan leading to a cache miss and triggers a computation without using the cached value.

Info

  • On Databricks Runtime 11.3 LTS and below, DataFrames cached against Delta table sources will not invalidate when the source table is overwritten and hence an older image will be used. This behavior remains the same even when the same cluster is used for overwrite. In this case, users must manually re-cache.
  • On Databricks Runtime 12.2 LTS and above, DataFrames cached against Delta table sources can be invalidated when the source table is overwritten. They are invalidated when the source is overwritten using the same cluster and will not be invalidated when the source is overwritten on a different cluster. 

Review the Databricks Runtime maintenance updates (AzureAWSGCP) for more information.

 

Solution

If your data and transformations are complex enough to cause unwanted re-caching you should consider using a different Spark persistence solution.

  • Materialize intermediate complex results into temporary Delta tables and drop them after your ETL is finished. This allows Spark to run the complex transformation once and write the results into a Delta table, which acts as temporary immutable persistent storage. Read the Delta table to get the calculated results.
  • Use a Spark DataFrame checkpoint to write the DataFrame into a reliable HDFS storage location where it can persist without being affected by underlying changes.