randomSplit on a DataFrame, you could potentially observe inconsistent behavior. Here is an example:
df = spark.read.format('inconsistent_data_source').load() a,b = df.randomSplit([0.5, 0.5]) a.join(broadcast(b), on='id', how='inner').count()
Typically this query returns
0. However, depending on the underlying data source or input
DataFrame, in some cases the query could result in more than 0 records.
This unexpected behavior is explained by the fact that data distribution across RDD partitions is not
idempotent, and could be rearranged or updated during the query execution, thus affecting the output
- Spark DataFrames and RDDs preserve partitioning order; this problem only exists when query output depends on the actual data distribution across partitions, for example, values from files 1, 2 and 3 always appear in partition 1.
- The issue could also be observed when using Delta cache. All solutions listed below are still applicable in this case.
Do one of the following:
Use explicit Apache Spark RDD caching
df = inputDF.cache() a,b = df.randomSplit([0.5, 0.5])
Repartition by a column or a set of columns
df = inputDF.repartition(100, 'col1') a,b = df.randomSplit([0.5, 0.5])
Apply an aggregate function
df = inputDF.groupBy('col1').count() a,b = df.randomSplit([0.5, 0.5])
These operations persist or shuffle data resulting in the consistent data distribution across partitions in Spark jobs.