Problem
In Databricks Apache Spark Streaming jobs, when processing files in Databricks File System (DBFS) you notice file corruption occurring with the following error.
RuntimeException: dbfs:/mnt/<path-to-warehouse>/<subdirectory>/part-00000-<unique-id>.c000.snappy.parquet is not a Parquet file. Expected magic number at tail, but found [65, 20, -64, -72]
This issue impacts the stability and data integrity of Spark Streaming jobs that read and write files in DBFS.
Cause
File corruption can be a result of using dbutils.fs
within executors, which is not supported. For instance, consider the following scenario where you're performing a recursive delete operation on executors.
dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
dbutils.fs.rm(file(0).toString, true)
}
In this case, dbutils.fs
creates a DBFS instance using a default Hadoop configuration that lacks the setting dbfs.client.version = v2
. Without this configuration, the improperly initialized DBFS v1 instance gets cached. When Spark writes files, it mistakenly uses this incorrect instance. Since DBFS v1 can cause incomplete file writes if the DataDaemon crashes or restarts, this may lead to incomplete writes at the target location and result in file corruption.
Solution
Replace dbutils.fs
operations with Hadoop filesystem methods to avoid caching issues. Databricks recommends using recursive deletion with Hadoop FS APIs for better scalability. Test in a staging environment before production.
import scala.util.{Try, Success, Failure}
import org.apache.hadoop.fs._
val source = "dbfs:/mnt/<your-base-path>/<your-sub-directory>/"
val conf = new org.apache.spark.util.SerializableConfiguration(sc.hadoopConfiguration)
val broadcastConf = sc.broadcast(conf)
def delete(p: String): Unit = {
dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
val conf = broadcastConf.value.value
val delPath = new Path(file(0).toString)
val toFs = delPath.getFileSystem(conf)
toFs.delete(delPath, true)
}
}
def walkDelete(root: String)(level: Int): Unit = {
dbutils.fs.ls(root).map(_.path).foreach { p =>
println(s"Deleting: $p, on level: $level")
val deleting = Try {
if (level == 0) delete(p)
else if (p endsWith "/") walkDelete(p)(level - 1)
else delete(p)
}
deleting match {
case Success(_) => println(s"Successfully deleted $p")
case Failure(e) => if (!e.getMessage.contains("specified path does not exist")) throw e
}
}
}
val level = 1 //set depending on your partition levels
walkDelete(source)(level)
Alternatively, you can use parallelized deletion. Parallelized deletion only parallelizes at the top level, which may be slower with skewed partitions.
import scala.util.{Try, Success, Failure}
import org.apache.hadoop.fs._
val source = "dbfs:/mnt/<your-base-path>/<your-sub-directory>/"
val conf = new org.apache.spark.util.SerializableConfiguration(sc.hadoopConfiguration)
val broadcastConf = sc.broadcast(conf)
val filesToDel = dbutils.fs.ls(source).map(_.path)
spark.sparkContext.parallelize(filesToDel).foreachPartition { rows =>
rows.foreach { file =>
val conf = broadcastConf.value.value
val delPath = new Path(file)
val toFs = delPath.getFileSystem(conf)
toFs.delete(delPath, true)
}
}