Apache Spark driver stops and restarts while reading JSON file data in an S3 bucket

Use a predefined schema to read the files, or read folder contents as text instead.

Written by G Yashwanth Kiran

Last published at: October 17th, 2024

Problem

You are using PySpark to read data from small JSON files located in an S3 bucket when you receive an error message. 

The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

 

Cause

You have a large number of JSON files in your S3 location, and not enough driver memory to handle the schema inference and processing of these files. Additionally, reading JSON files with many nested fields under several columns can make the table very wide and inefficient, further contributing to the issue.

Context 

The JSON scan phase in Databricks first infers a unified schema for all JSON files before execution. When the total file size is relatively small (such as 50MB) but the schema size (schema.toString) is large (such as ~50M), the driver needs to have multiple copies of objects with sizes proportional to the schema size, which can consume too much driver memory.

 

Solution

Define a schema before the read step and use that predefined schema to read the JSON from the S3 path. This reduces execution time and memory usage.

Example

```python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", IntegerType(), True),
    # Add more fields as required
])
filepath = f"s3:/…/load_date=<YYYY-MM-DD>"
raw_df = spark.read.schema(schema).json(filepath)
display(raw_df)
```

 

If defining a schema is not feasible, consider reading the contents of the folder as text instead of in JSON format. Then, query the JSON content with JSON expressions on the text string.

Example

```python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", IntegerType(), True),
    # Add more fields as required
])
filepath = f"s3:/…/load_date=<YYYY-MM-DD>"
raw_df = spark.read.schema(schema).json(filepath)
display(raw_df)
```

 

Preventive Measures

  • When working with JSON files with diverse schemas, consider using schema evolution or streaming to handle schema changes more efficiently.
  • Monitor driver memory usage and adjust the driver's memory allocation accordingly.
  • Optimize JSON file processing by partitioning the data into smaller, more manageable chunks.