Create a DataFrame from a JSON string or Python dictionary

Create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary.

Written by ram.sankarasubramanian

Last published at: October 9th, 2024

This article explains how you can create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary.

Info

A previous version of this article recommended using Scala for this use case. Databricks recommends using Python. All of the sample code in this article is written in Python.

 

Create a Spark DataFrame from a JSON string

1. Add the JSON content from the variable to a list.

%python

from pyspark.sql import Row
import json

json_content1 = """{"json_col1": "hello", "json_col2": "32"}"""
json_content2 = """{"json_col1": "hello", "json_col2": "world"}"""

2. Add the JSON content as a dictionary object to a python list.

json_data = []
json_data.append(json.loads(json_content1))
json_data.append(json.loads(json_content2))

3. Parse the list of dictionaries to create a Spark DataFrame.

rows = [Row(**json_dict) for json_dict in json_data]
df = spark.createDataFrame(rows)
display(df)

Combined sample code

This sample code block combines the previous steps into a single example.

%python

from pyspark.sql import Row
import json

json_content1 = """{"json_col1": "hello", "json_col2": "32"}"""
json_content2 = """{"json_col1": "hello", "json_col2": "world"}"""


json_data = []
json_data.append(json.loads(json_content1))
json_data.append(json.loads(json_content2))
rows = [Row(**json_dict) for json_dict in json_data]
df = spark.createDataFrame(rows)
display(df)

Extract a string column with JSON data from a DataFrame and parse it

1. Create a sample DataFrame.

%python

from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")]

schema = StructType([
  StructField("id", StringType()),
  StructField("value", StringType()),
  StructField("token", StringType())
])

df = spark.createDataFrame(data, schema)

2. Use from_json() to parse the JSON column when reading the DataFrame. 

json_schema = StructType([
    StructField("json_col1", StringType(), True),
    StructField("json_col2", StringType(), True)
])
df2 = df.withColumn('json', from_json(col('value'), json_schema)).select("*", "json.*")
display(df2)

Combined sample code

This sample code block combines the previous steps into a single example.

%python

from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")]

schema = StructType([
  StructField("id", StringType()),
  StructField("value", StringType()),
  StructField("token", StringType())
])

df = spark.createDataFrame(data, schema)

json_schema = StructType([
    StructField("json_col1", StringType(), True),
    StructField("json_col2", StringType(), True)
])

df2 = df.withColumn('json', from_json(col('value'), json_schema)).select("*", "json.*")
display(df2)

Create a Spark DataFrame from a Python dictionary

1. Check the data type and confirm that it is dictionary type.

%python

from pyspark.sql import Row
import json
jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}

type(jsonDataDict)

2. Add the dictionary to a list and parse the list to create a Spark DataFrame.

rows = [Row(**json_dict) for json_dict in [jsonDataDict]]
df = spark.createDataFrame(rows)
df.display()

Combined sample code

This sample code block combines the previous steps into a single example.

%python

from pyspark.sql import Row
import json
jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}

rows = [Row(**json_dict) for json_dict in [jsonDataDict]]
df = spark.createDataFrame(rows)
df.display()