This article explains how you can create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary.
Info
A previous version of this article recommended using Scala for this use case. Databricks recommends using Python. All of the sample code in this article is written in Python.
Create a Spark DataFrame from a JSON string
1. Add the JSON content from the variable to a list.
%python
from pyspark.sql import Row
import json
json_content1 = """{"json_col1": "hello", "json_col2": "32"}"""
json_content2 = """{"json_col1": "hello", "json_col2": "world"}"""
2. Add the JSON content as a dictionary object to a python list.
json_data = []
json_data.append(json.loads(json_content1))
json_data.append(json.loads(json_content2))
3. Parse the list of dictionaries to create a Spark DataFrame.
rows = [Row(**json_dict) for json_dict in json_data]
df = spark.createDataFrame(rows)
display(df)
Combined sample code
This sample code block combines the previous steps into a single example.
%python
from pyspark.sql import Row
import json
json_content1 = """{"json_col1": "hello", "json_col2": "32"}"""
json_content2 = """{"json_col1": "hello", "json_col2": "world"}"""
json_data = []
json_data.append(json.loads(json_content1))
json_data.append(json.loads(json_content2))
rows = [Row(**json_dict) for json_dict in json_data]
df = spark.createDataFrame(rows)
display(df)
Extract a string column with JSON data from a DataFrame and parse it
1. Create a sample DataFrame.
%python
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")]
schema = StructType([
StructField("id", StringType()),
StructField("value", StringType()),
StructField("token", StringType())
])
df = spark.createDataFrame(data, schema)
2. Use from_json() to parse the JSON column when reading the DataFrame.
json_schema = StructType([
StructField("json_col1", StringType(), True),
StructField("json_col2", StringType(), True)
])
df2 = df.withColumn('json', from_json(col('value'), json_schema)).select("*", "json.*")
display(df2)
Combined sample code
This sample code block combines the previous steps into a single example.
%python
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")]
schema = StructType([
StructField("id", StringType()),
StructField("value", StringType()),
StructField("token", StringType())
])
df = spark.createDataFrame(data, schema)
json_schema = StructType([
StructField("json_col1", StringType(), True),
StructField("json_col2", StringType(), True)
])
df2 = df.withColumn('json', from_json(col('value'), json_schema)).select("*", "json.*")
display(df2)
Create a Spark DataFrame from a Python dictionary
1. Check the data type and confirm that it is dictionary type.
%python
from pyspark.sql import Row
import json
jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}
type(jsonDataDict)
2. Add the dictionary to a list and parse the list to create a Spark DataFrame.
rows = [Row(**json_dict) for json_dict in [jsonDataDict]]
df = spark.createDataFrame(rows)
df.display()
Combined sample code
This sample code block combines the previous steps into a single example.
%python
from pyspark.sql import Row
import json
jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}
rows = [Row(**json_dict) for json_dict in [jsonDataDict]]
df = spark.createDataFrame(rows)
df.display()