Error when trying to use Apache Spark’s Pyspark offset method on DataFrames with serverless compute

Use the limit method or the monotonically_increasing_id() function instead.

Written by Tarun Sanjeev

Last published at: December 23rd, 2024

Problem

When attempting to use Apache Spark’s Pyspark offset method on DataFrames using serverless compute, you receive an error message.

UnsupportedOperationException: Do not support SQL OFFSET with HybridCloudStoreFormat

Example

This code creates a DataFrame from a list of the provided data, attempts to use the offset method to skip the first 2 rows, and then displays the resulting DataFrame.

When run on a classic compute cluster, the example code runs without issue. When run on serverless compute, the code results in an error when the offset method is called.

%python

from pyspark.sql import SparkSession

# create a dataframe
data = [("1", "a"), ("2", "b"), ("3", "c"), ("4", "d"), ("5", "e")]
df = spark.createDataFrame(data, ["id", "val"])

# use the offset method
df_offset = df.orderBy("id").offset(2)
df_offset.show()

Cause

The offset method is not supported in serverless compute due to the storage format used and underlying architecture.

The optimized storage format, HybridCloudStoreFormat is designed for distributed computing. HybridCloudStoreFormat relies on the cluster's ability to maintain a stateful connection to the data source, which is not possible in serverless compute due to its stateless task execution.

Because serverless compute executes tasks in a stateless manner, the connection to the data source is established and closed for each task. 

Solution

You should choose an alternative function to achieve similar results.

For small datasets 

Use the limit method to achieve similar results to the offset method.

Info

Although the limit method is efficient and easy to use, it only returns a limited number of rows from the beginning of the DataFrame and is less flexible than offset

 

Example using limit

%python

df1 = spark.createDataFrame(data, ["id", "val"])
dfNew = df1.orderBy("id").limit(1000).filter(df1.id > 5)
dfNew.show()

For large datasets with pagination

Use the monotonically_increasing_id() library function. Creating a unique ID for each row allows you to filter based on that ID for efficient pagination, and ensures consistent results in the presence of a high write throughput.

Example using monotonically_increasing_id()

%python

from pyspark.sql.functions import monotonically_increasing_id, col
df = spark.createDataFrame(data, ["id", "val"])

# Add a unique identifier to each row
df_with_id = df.withColumn("row_id", monotonically_increasing_id())

# Simulate offset by filtering based on the row_id
offset_value = 2
df_offset = df_with_id.filter(col("row_id") >= offset_value).drop("row_id")
df_offset.show()

Helpjuice Info Callout Title

When choosing between classic compute and serverless compute for your jobs, you should select the compute that is best suited for your specific task. For more information, review the Serverless compute limitations (AWSAzureGCP) documentation.