Parallelize filesystem operations

When you need to speed up copy and move operations, parallelizing them is usually a good option. You can use Apache Spark to parallelize operations on executors. On Databricks you can use DBUtils APIs, however these API calls are meant for use on driver nodes, and shouldn’t be used on Spark jobs running on executors.

In this article, we are going to show you how to use the Apache Hadoop FileUtil function along with DBUtils to parallelize a Spark copy operation.

You can use this example as a basis for other filesystem operations.

Note

The example copy operation may look familiar as we are using DBUtils and Hadoop FileUtil to emulate the functions of the Hadoop DistCp tool.

Import required libraries

Import the Hadoop functions and define your source and destination locations.

import org.apache.hadoop.fs._

val source = "<source dir>"
val dest = "<destination dir>"

dbutils.fs.mkdirs(dest)

Broadcast information from the driver to executors

val conf = new org.apache.spark.util.SerializableConfiguration(sc.hadoopConfiguration)
val broadcastConf = sc.broadcast(conf)
val broadcastDest = sc.broadcast(dest)

Copy paths to a sequence

val filesToCopy = dbutils.fs.ls(source).map(_.path)

Parallelize the sequence and divide the workload

Here we first get the Hadoop configuration and destination path. Then we create the path objects, before finally executing the FileUtil.copy command.

spark.sparkContext.parallelize(filesToCopy).foreachPartition { rows =>
  rows.foreach { file =>

    val conf = broadcastConf.value.value
    val destPathBroadcasted = broadcastDest.value

    val fromPath = new Path(file)
    val toPath = new Path(destPathBroadcasted)
    val fromFs = fromPath.getFileSystem(conf)
    val toFs = toPath.getFileSystem(conf)

    FileUtil.copy(fromFs, fromPath, toFs, toPath, false, conf)
  }
}