When you need to speed up copy and move operations, parallelizing them is usually a good option. You can use Apache Spark to parallelize operations on executors. On Databricks you can use DBUtils APIs, however these API calls are meant for use on driver nodes, and shouldn’t be used on Spark jobs running on executors.
In this article, we are going to show you how to use the Apache Hadoop FileUtil function along with DBUtils to parallelize a Spark copy operation.
You can use this example as a basis for other filesystem operations.
Note
The example copy operation may look familiar as we are using DBUtils and Hadoop FileUtil to emulate the functions of the Hadoop DistCp tool.
Import required libraries
Import the Hadoop functions and define your source and destination locations.
%scala
import org.apache.hadoop.fs._
val source = "<source dir>"
val dest = "<destination dir>"
dbutils.fs.mkdirs(dest)
Broadcast information from the driver to executors
%scala
val conf = new org.apache.spark.util.SerializableConfiguration(sc.hadoopConfiguration)
val broadcastConf = sc.broadcast(conf)
val broadcastDest = sc.broadcast(dest)
Copy paths to a sequence
%scala
val filesToCopy = dbutils.fs.ls(source).map(_.path)
Parallelize the sequence and divide the workload
Here we first get the Hadoop configuration and destination path. Then we create the path objects, before finally executing the FileUtil.copy command.
%scala
spark.sparkContext.parallelize(filesToCopy).foreachPartition { rows =>
rows.foreach { file =>
val conf = broadcastConf.value.value
val destPathBroadcasted = broadcastDest.value
val fromPath = new Path(file)
val toPath = new Path(destPathBroadcasted)
val fromFs = fromPath.getFileSystem(conf)
val toFs = toPath.getFileSystem(conf)
FileUtil.copy(fromFs, fromPath, toFs, toPath, false, conf)
}
}