Problem
When attempting to use Apache Spark’s Pyspark offset
method on DataFrames using serverless compute, you receive an error message.
UnsupportedOperationException: Do not support SQL OFFSET with HybridCloudStoreFormat
Example
This code creates a DataFrame from a list of the provided data, attempts to use the offset method to skip the first 2 rows, and then displays the resulting DataFrame.
When run on a classic compute cluster, the example code runs without issue. When run on serverless compute, the code results in an error when the offset method is called.
%python
from pyspark.sql import SparkSession
# create a dataframe
data = [("1", "a"), ("2", "b"), ("3", "c"), ("4", "d"), ("5", "e")]
df = spark.createDataFrame(data, ["id", "val"])
# use the offset method
df_offset = df.orderBy("id").offset(2)
df_offset.show()
Cause
The offset
method is not supported in serverless compute due to the storage format used and underlying architecture.
The optimized storage format, HybridCloudStoreFormat
is designed for distributed computing. HybridCloudStoreFormat
relies on the cluster's ability to maintain a stateful connection to the data source, which is not possible in serverless compute due to its stateless task execution.
Because serverless compute executes tasks in a stateless manner, the connection to the data source is established and closed for each task.
Solution
You should choose an alternative function to achieve similar results.
For small datasets
Use the limit
method to achieve similar results to the offset
method.
Info
Although the limit
method is efficient and easy to use, it only returns a limited number of rows from the beginning of the DataFrame and is less flexible than offset
.
Example using limit
%python
df1 = spark.createDataFrame(data, ["id", "val"])
dfNew = df1.orderBy("id").limit(1000).filter(df1.id > 5)
dfNew.show()
For large datasets with pagination
Use the monotonically_increasing_id()
library function. Creating a unique ID for each row allows you to filter based on that ID for efficient pagination, and ensures consistent results in the presence of a high write throughput.
Example using monotonically_increasing_id()
%python
from pyspark.sql.functions import monotonically_increasing_id, col
df = spark.createDataFrame(data, ["id", "val"])
# Add a unique identifier to each row
df_with_id = df.withColumn("row_id", monotonically_increasing_id())
# Simulate offset by filtering based on the row_id
offset_value = 2
df_offset = df_with_id.filter(col("row_id") >= offset_value).drop("row_id")
df_offset.show()
Helpjuice Info Callout Title
When choosing between classic compute and serverless compute for your jobs, you should select the compute that is best suited for your specific task. For more information, review the Serverless compute limitations (AWS | Azure | GCP) documentation.