Parallelize filesystem operations

Parallelize Apache Spark filesystem operations with DBUtils and Hadoop FileUtil; emulate DistCp.

Written by sandeep.chandran

Last published at: December 9th, 2022

When you need to speed up copy and move operations, parallelizing them is usually a good option. You can use Apache Spark to parallelize operations on executors. On Databricks you can use DBUtils APIs, however these API calls are meant for use on driver nodes, and shouldn’t be used on Spark jobs running on executors.

In this article, we are going to show you how to use the Apache Hadoop FileUtil function along with DBUtils to parallelize a Spark copy operation.

You can use this example as a basis for other filesystem operations.

Note

The example copy operation may look familiar as we are using DBUtils and Hadoop FileUtil to emulate the functions of the Hadoop DistCp tool.

Import required libraries

Import the Hadoop functions and define your source and destination locations.

%scala

import org.apache.hadoop.fs._

val source = "<source dir>"
val dest = "<destination dir>"

dbutils.fs.mkdirs(dest)

Broadcast information from the driver to executors

%scala

val conf = new org.apache.spark.util.SerializableConfiguration(sc.hadoopConfiguration)
val broadcastConf = sc.broadcast(conf)
val broadcastDest = sc.broadcast(dest)

Copy paths to a sequence

%scala

val filesToCopy = dbutils.fs.ls(source).map(_.path)

Parallelize the sequence and divide the workload

Here we first get the Hadoop configuration and destination path. Then we create the path objects, before finally executing the FileUtil.copy command.

%scala

spark.sparkContext.parallelize(filesToCopy).foreachPartition { rows =>
  rows.foreach { file =>


    val conf = broadcastConf.value.value
    val destPathBroadcasted = broadcastDest.value


    val fromPath = new Path(file)
    val toPath = new Path(destPathBroadcasted)
    val fromFs = fromPath.getFileSystem(conf)
    val toFs = toPath.getFileSystem(conf)


    FileUtil.copy(fromFs, fromPath, toFs, toPath, false, conf)
  }
}