GeoSpark undefined function error with DBConnect

Use GeoSpark code with a DBConnect session.

Written by arjun.kaimaparambilrajan

Last published at: June 1st, 2022

Problem

You are trying to use the GeoSpark function st_geofromwkt with DBConnect (AWS | Azure | GCP) and you get an Apache Spark error message.

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'st_geomfromwkt'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

This example code fails with the error when used with DBConnect.

%scala

val sc = spark.sparkContext
sc.setLogLevel("DEBUG")

val sqlContext = spark.sqlContext
spark.sparkContext.addJar("~/jars/geospark-sql_2.3-1.2.0.jar")
spark.sparkContext.addJar("~/jars/geospark-1.2.0.jar")

GeoSparkSQLRegistrator.registerAll(sqlContext)
println(spark.sessionState.functionRegistry.listFunction)

spark.sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()

Cause

DBConnect does not support auto-sync of client side UDFs to the server.

Solution

You can use a custom utility jar with code that registers the UDF on the cluster using the SparkSessionExtensions class.

  1. Create a utility jar that registers GeoSpark functions using SparkSessionExtensions. This utility class definition can be built into a utility jar.
    %scala
    
    package com.databricks.spark.utils
    
    import org.apache.spark.sql.SparkSessionExtensions
    import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
    
    class GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {
      def apply(e: SparkSessionExtensions): Unit = {
        e.injectCheckRule(spark => {
          println("INJECTING UDF")
          GeoSparkSQLRegistrator.registerAll(spark)
          _ => Unit
        })
      }
    }
  2. Copy the GeoSpark jars and your utility jar to DBFS at dbfs:/databricks/geospark-extension-jars/.
  3. Create an init script (set_geospark_extension_jar.sh) that copies the jars from the DBFS location to the Spark class path and sets the spark.sql.extensions to the utility class.
    %scala
    
    dbutils.fs.put(
        "dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh",
        """#!/bin/sh
          |sleep 10s
          |# Copy the extension and GeoSpark dependency jars to /databricks/jars.
          |cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar,geospark_1_2_0.jar} /databricks/jars/
          |# Set the extension.
          |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf
          |[driver] {
          |    "spark.sql.extensions" = "com.databricks.spark.utils.GeoSparkUdfExtension"
          |}
          |EOF
          |""".stripMargin,
        overwrite = true
    )
  4. Install the init script as a cluster-scoped init script (AWS | Azure | GCP). You will need the full path to the location of the script (dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh).
  5. Reboot your cluster.
  6. You can now use GeoSpark code with DBConnect.