applyInPandasWithState fails with a ModuleNotFoundError when used with Delta Live Tables

Define a function within the notebook instead of calling it directly.

Written by mounika.tarigopula

Last published at: December 2nd, 2024

Problem

You are trying to use applyInPandasWithState with Delta Live Tables but execution fails with a ModuleNotFoundError: No module named 'helpers' error message.

Example error

Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
   func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)                                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/databricks/spark/python/pyspark/worker.py", line 1770, in read_udfs
   arg_offsets, f = read_single_udf(
                    ^^^^^^^^^^^^^^^^
 File "/databricks/spark/python/pyspark/worker.py", line 802, in read_single_udf
   f, return_type = read_command(pickleSer, infile)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
   command = serializer._read_with_length(file)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length
   raise SerializationError("Caused by " + traceback.format_exc())
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
   return self.loads(obj)
          ^^^^^^^^^^^^^^^
 File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
   return cloudpickle.loads(obj, encoding=encoding)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'helpers'

Cause

ApplyInPandasWithState does not work correctly when used with Delta Live Tables if you define the function you want to use outside of your notebook.

In this case, we are trying to use the count_fn function that is defined in the helpers.streaming.functions module. It is imported at the start of the example code block and then called as part of applyInPandasWithState. This results in an error.

Example code

%python

import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.streaming.state import GroupStateTimeout
from helpers.streaming.functions import count_fn
from pyspark.sql.functions import udf

df = (
    spark.readStream.format("rate")
    .option("rowsPerSecond", "100")
    .load()
    .withColumn("id", col("value"))
    .groupby("id")
    .applyInPandasWithState(
        func=count_fn,
        outputStructType="id long, countAsString string",
        stateStructType="len long",
        outputMode="append",
        timeoutConf=GroupStateTimeout.NoTimeout,
    )
)
import dlt
import time

@dlt.table(name=f"random_{int(time.time())}")
def a():
  return df

Solution

You should define the function you want to use within the notebook, reimporting the function you want to call as part of your custom function. Call the function you defined and it completes as expected.

This custom function imports count_fn and runs it. By adding this to the sample code, and calling my_func instead of calling count_fn directly, the example code successfully completes.

def my_func(*args):
    from helpers.streaming.functions import count_fn
    return count_fn(*args)

Example code

%python

import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.streaming.state import GroupStateTimeout
from helpers.streaming.functions import count_fn
from pyspark.sql.functions import udf

def my_func(*args):
    from helpers.streaming.functions import count_fn
    return count_fn(*args)

df = (
    spark.readStream.format("rate")
    .option("rowsPerSecond", "100")
    .load()
    .withColumn("id", col("value"))
    .groupby("id")
    .applyInPandasWithState(
        func=my_func,
        outputStructType="id long, countAsString string",
        stateStructType="len long",
        outputMode="append",
        timeoutConf=GroupStateTimeout.NoTimeout,
    )
)
import dlt
import time

@dlt.table(name=f"random_{int(time.time())}")
def a():
  return df