Create a DataFrame from a JSON string or Python dictionary

Create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary.

Written by ram.sankarasubramanian

Last published at: May 23rd, 2022

In this article we are going to review how you can create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary.

Create a Spark DataFrame from a JSON string

  1. Add the JSON content from the variable to a list.
    %scala
    
    import scala.collection.mutable.ListBuffer
    val json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
    val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
    
    var json_seq = new ListBuffer[String]()
    json_seq += json_content1
    json_seq += json_content2
  2. Create a Spark dataset from the list.
    %scala
    
    val json_ds = json_seq.toDS()
  3. Use spark.read.json to parse the Spark dataset.
    %scala
    
    val df= spark.read.json(json_ds)
    display(df)

Combined sample code

These sample code blocks combine the previous steps into individual examples. The Python and Scala samples perform the same tasks.

%python

json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"

json_list = []
json_list.append(json_content1)
json_list.append(json_content2)

df = spark.read.json(sc.parallelize(json_list))
display(df)
%scala

import scala.collection.mutable.ListBuffer
val json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"

var json_seq = new ListBuffer[String]()
json_seq += json_content1
json_seq += json_content2

val json_ds = json_seq.toDS()
val df= spark.read.json(json_ds)
display(df)

Extract a string column with JSON data from a DataFrame and parse it

  1. Select the JSON column from a DataFrame and convert it to an RDD of type RDD[Row].
    %scala
    
    
    import org.apache.spark.sql.functions._
    
    
    val test_df = Seq(("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")).toDF("row_number", "json", "token")
    
    
    val row_rdd = test_df.select(col("json")).rdd  // Selecting just the JSON column and converting it to RDD.
  2. Convert RDD[Row] to RDD[String].
    %scala
    
    val string_rdd = row_rdd.map(_.mkString(","))
  3. Use spark.read.json to parse the RDD[String].
    %scala
    
    
    val df1= spark.read.json(string_rdd)
     display(df1)

Combined sample code

This sample code block combines the previous steps into a single example.

%scala

import org.apache.spark.sql.functions._

val test_df = Seq(("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")).toDF("row_number", "json", "token")

val row_rdd = test_df.select(col("json")).rdd
val string_rdd = row_rdd.map(_.mkString(","))

val df1= spark.read.json(string_rdd)
display(df1)

Create a Spark DataFrame from a Python directory

  1. Check the data type and confirm that it is of dictionary type.
    %python
    
    jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}
    
    type(jsonDataDict)
  2. Use json.dumpsto convert the Python dictionary into a JSON string.
    %python
    
    import json
    jsonData = json.dumps(jsonDataDict)
  3. Add the JSON content to a list.
    %python
    
    jsonDataList = []
    jsonDataList.append(jsonData)
  4. Convert the list to a RDD and parse it using spark.read.json.
    %python
    
    jsonRDD = sc.parallelize(jsonDataList)
    df = spark.read.json(jsonRDD)
    display(df)

Combined sample code

These sample code block combines the previous steps into a single example.

%python

jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}

type(jsonDataDict)

import json
jsonData = json.dumps(jsonDataDict)

jsonDataList = []
jsonDataList.append(jsonData)

jsonRDD = sc.parallelize(jsonDataList)
df = spark.read.json(jsonRDD)
display(df)

Example notebook

Review the Parse a JSON string or Python dictionary example notebook.