Problem
When using Apache Spark to read a set of files from a Databricks File System (DBFS) directory, you send it to the executor's local disk using SparkContext.addFile
and then construct a path with the file://
prefix. The following code provides an example.
from pyspark import SparkFiles
pattern_pool = "dbfs:<path>/dir"
sc.addFile(pattern_pool, recursive=True)
# Build a local file:// path from the driver's view
local_dir = SparkFiles.get("dir")
rdd = sc.textFile("file://" + local_dir + "/") # Fails on multi-node compute
You notice your code works on a single-node compute but fails with the following FileNotFoundException
error on multi-node compute.
java.io.FileNotFoundException: file:/local_disk0/.../userFiles-.../dir/<file>.<format> does not exist
Cause
SparkContext.addFile
is for shipping small side files to executors. On a single-node compute, the driver and executor share the same machine, so the driver’s local path happens to work.
In a multi-node compute, each executor has its own userFiles-<uuid>
directory, so the driver-computed file:///local_disk0/...
path won’t exist on the other nodes, resulting in the FileNotFoundException
.
Solution
Read directly from DBFS (dbfs:/
) so Spark can plan remotely and parallelize correctly, instead of using addFile
and file://
. You can adapt and use the following example code. It demonstrates a line-wise read and a one record per XML file approach.
# Line-wise read
rdd = sc.textFile("dbfs:/<path>")
# Or: one record per XML file
rdd = sc.wholeTextFiles("dbfs:<path>")
If you need files on each executor’s local disk, use addFile
only to distribute them, then resolve paths inside tasks with SparkFiles.get(...)
. You can adapt and use the following example code.
from pyspark import SparkFiles
xml_dir = "dbfs:/<path>"
files = [f for f in dbutils.fs.ls(xml_dir) if f.name.endswith(".<format>")]
for file in files:
sc.addFile(file.path)
file_names = [f.name for f in files]
files_bc = sc.broadcast(file_names)
def read_local_files(_):
# This runs on executors. SparkFiles.get resolves the executor-local path.
from pyspark import SparkFiles
for name in files_bc.value:
local_path = SparkFiles.get(name)
print(local_path)
with open(local_path, "r") as fh:
for line in fh:
yield line
rdd = sc.parallelize([0], sc.defaultParallelism).flatMap(read_local_files)
print(rdd.take(5))