Behavior of the randomSplit Method

When using randomSplit on a DataFrame, you could potentially observe inconsistent behavior. Here is an example:

df = spark.read.format('inconsistent_data_source').load()
a,b = df.randomSplit([0.5, 0.5])
a.join(broadcast(b), on='id', how='inner').count()

Typically this query returns 0. However, depending on the underlying data source or input DataFrame, in some cases the query could result in more than 0 records.

This unexpected behavior is explained by the fact that data distribution across RDD partitions is not idempotent, and could be rearranged or updated during the query execution, thus affecting the output of the randomSplit method.

Note

  • Spark DataFrames and RDDs preserve partitioning order; this problem only exists when query output depends on the actual data distribution across partitions, for example, values from files 1, 2 and 3 always appear in partition 1.
  • The issue could also be observed when using Delta cache. All solutions listed below are still applicable in this case.

Solution

Do one of the following:

  • Use explicit Apache Spark RDD caching

    df = inputDF.cache()
    a,b = df.randomSplit([0.5, 0.5])
    
  • Repartition by a column or a set of columns

    df = inputDF.repartition(100, 'col1')
    a,b = df.randomSplit([0.5, 0.5])
    
  • Apply an aggregate function

    df = inputDF.groupBy('col1').count()
    a,b = df.randomSplit([0.5, 0.5])
    

These operations persist or shuffle data resulting in the consistent data distribution across partitions in Spark jobs.