Using Pyspark testing library assertDataFrameEqual throws OutOfMemoryError

Verify schemas are equivalent, then either ensure sufficient driver memory or compare a subset of DataFrames.

Written by brock.baurer

Last published at: November 17th, 2024

Problem

When working with the Pyspark testing library assertDataFrameEqual, you expect assertDataFrameEqual to confirm DataFrame equivalence, or throw an assertion error if no equivalence and provide information on the differences. 

Instead, you encounter a DriverStoppedException or an OutOfMemoryError

 

Example 

In a Databricks Runtime 15.4 LTS cluster the error message indicates an unresponsive Python kernel. 

 

from pyspark.testing.utils import assertDataFrameEqual

assertDataFrameEqual(actual=spark.range(100_000_000), expected=spark.range(99_999_999))

Fatal error: The Python kernel is unresponsive.

 

Cause

You’re performing the same types of comparison tests on one or more DataFrames which are larger than available Driver memory. 

Internally, the assertDataFrameEqual function calls .collect() on the actual and expected DataFrames. 

 

expected_list = expected.collect()
actual_list = actual.collect()

 

Collect is a memory-intensive and driver-bound operation, and may lead to unexpected driver behavior such as OutOfMemoryError or DriverStoppedException errors. 

 

Solution

First, use assertSchemaEqual to verify first that the schemas are equivalent before comparing the actual rows and columns in the DataFrames. If schemas are unequal, you may not need to evaluate the DataFrame contents given that they will be inherently different as a result of schema differences. For more information on assertDataFrameEqual, review the Databricks Simplify PySpark testing with DataFrame equality functions blog post. 

 

If you need to consider the entirety of your DataFrames, ensure you have enough memory. 

  • Monitor driver memory usage and adjust cluster memory by increasing the size of the Driver VM. 
  • Use a DataFrame operation such as df.subtract() to identify differences on larger DataFrames when 100% accuracy is required and the size of the data exceeds the driver memory limits. 

 

If you are able, test on a subset of the DataFrames and reduce the overall memory footprint. 

  • Use assertDataFrameEqual with a subset of data using df.limit(n) or a sample using df.sample(...)
  • Use a subset of columns such as comparing on a unique/primary key only, using df.select(...)