Problem
When working with the Pyspark testing library assertDataFrameEqual
, you expect assertDataFrameEqual
to confirm DataFrame equivalence, or throw an assertion error if no equivalence and provide information on the differences.
Instead, you encounter a DriverStoppedException
or an OutOfMemoryError
.
Example
In a Databricks Runtime 15.4 LTS cluster the error message indicates an unresponsive Python kernel.
from pyspark.testing.utils import assertDataFrameEqual
assertDataFrameEqual(actual=spark.range(100_000_000), expected=spark.range(99_999_999))
Fatal error: The Python kernel is unresponsive.
Cause
You’re performing the same types of comparison tests on one or more DataFrames which are larger than available Driver memory.
Internally, the assertDataFrameEqual
function calls .collect()
on the actual and expected DataFrames.
expected_list = expected.collect()
actual_list = actual.collect()
Collect
is a memory-intensive and driver-bound operation, and may lead to unexpected driver behavior such as OutOfMemoryError
or DriverStoppedException
errors.
Solution
First, use assertSchemaEqual
to verify first that the schemas are equivalent before comparing the actual rows and columns in the DataFrames. If schemas are unequal, you may not need to evaluate the DataFrame contents given that they will be inherently different as a result of schema differences. For more information on assertDataFrameEqual
, review the Databricks Simplify PySpark testing with DataFrame equality functions blog post.
If you need to consider the entirety of your DataFrames, ensure you have enough memory.
- Monitor driver memory usage and adjust cluster memory by increasing the size of the Driver VM.
- Use a DataFrame operation such as
df.subtract()
to identify differences on larger DataFrames when 100% accuracy is required and the size of the data exceeds the driver memory limits.
If you are able, test on a subset of the DataFrames and reduce the overall memory footprint.
- Use
assertDataFrameEqual
with a subset of data usingdf.limit(n)
or a sample usingdf.sample(...)
- Use a subset of columns such as comparing on a unique/primary key only, using
df.select(...)