Problem
Apache Spark DataFrame caching is used to prevent redundant computations during a complex ETL process. When you attempt to reuse a cached DataFrame, the actions trigger a full recomputation of the transformation instead of using the cached information.
Cause
There are several reasons why a cached DataFrame can be invalidated, leading to unexpected recalculations.
- When you call
df.cache()
the DataFrame is not immediately cached. The caching occurs when an action likedf.count()
,df.show()
, etc. is executed. If an action is not triggered after callingdf.cache()
, the DataFrame is not cached. - When a DataFrame is cached on cluster A and the underlying data source is updated (
INSERT
/UPDATE
/MERGE
/DELETE
) using the same cluster, the cache is invalidated and recomputed. If the underlying data source is updated using another cluster (cluster B), the cache on the original cluster (cluster A) is not affected. In this scenario, the cached DataFrame continues to be used, but it reflects the state of the data as it was before the external update. - By default, Spark caches data using the
MEMORY_AND_DISK
storage level. In this mode, if there is insufficient memory for live task computations, Spark automatically moves the cached data from memory to disk when the memory is full. But if the storage level is set toMEMORY_ONLY
, cache data could be evicted when the memory is full. In this case, a lazy re-calculation of the cache data would occur on the next access. - Performing any non-idempotent transformations on a cached DataFrame may lead to recomputation. This is because the logical plan of DataFrame currently being executed may mismatch with the cached logical plan leading to a cache miss and triggers a computation without using the cached value.
Info
- On Databricks Runtime 11.3 LTS and below, DataFrames cached against Delta table sources will not invalidate when the source table is overwritten and hence an older image will be used. This behavior remains the same even when the same cluster is used for overwrite. In this case, users must manually re-cache.
- On Databricks Runtime 12.2 LTS and above, DataFrames cached against Delta table sources can be invalidated when the source table is overwritten. They are invalidated when the source is overwritten using the same cluster and will not be invalidated when the source is overwritten on a different cluster.
Review the Databricks Runtime maintenance updates (Azure | AWS | GCP) for more information.
Solution
If your data and transformations are complex enough to cause unwanted re-caching you should consider using a different Spark persistence solution.
- Materialize intermediate complex results into temporary Delta tables and drop them after your ETL is finished. This allows Spark to run the complex transformation once and write the results into a Delta table, which acts as temporary immutable persistent storage. Read the Delta table to get the calculated results.
- Use a Spark DataFrame checkpoint to write the DataFrame into a reliable HDFS storage location where it can persist without being affected by underlying changes.