How to list and delete files faster in Databricks

Learn how to list and delete files faster in Databricks.

Written by Adam Pavlacka

Last published at: May 31st, 2022

Scenario

Suppose you need to delete a table that is partitioned by year, month, date, region, and service. However, the table is huge, and there will be around 1000 part files per partition. You can list all the files in each partition and then delete them using an Apache Spark job.

For example, suppose you have a table that is partitioned by a, b, and c:

%scala

Seq((1,2,3,4,5),
  (2,3,4,5,6),
  (3,4,5,6,7),
  (4,5,6,7,8))
  .toDF("a", "b", "c", "d", "e")
  .write.mode("overwrite")
  .partitionBy("a", "b", "c")
  .parquet("/mnt/path/table")

List files

You can list all the part files using this function:

%scala

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI

def listFiles(basep: String, globp: String): Seq[String] = {
  val conf = new Configuration(sc.hadoopConfiguration)
  val fs = FileSystem.get(new URI(basep), conf)

  def validated(path: String): Path = {
    if(path startsWith "/") new Path(path)
    else new Path("/" + path)
  }

  val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
    paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
    hadoopConf = conf,
    filter = null,
    sparkSession = spark, areRootPaths=true)

 // If you are using Databricks Runtime 6.x and below,
 // remove <areRootPaths=true> from the bulkListLeafFiles function parameter.

  fileCatalog.flatMap(_._2.map(_.path))
}

val root = "/mnt/path/table"
val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

val files = listFiles(root, globp)
files.toDF("path").show()
+------------------------------------------------------------------------------------------------------------------------------+
|path                                                                                                                          |
+------------------------------------------------------------------------------------------------------------------------------+
|dbfs:/mnt/path/table/a=1/b=2/c=3/part-00000-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-5.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=2/b=3/c=4/part-00001-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-6.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=3/b=4/c=5/part-00002-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-7.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=4/b=5/c=6/part-00003-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-8.c000.snappy.parquet|
+------------------------------------------------------------------------------------------------------------------------------+

The listFiles function takes a base path and a glob path as arguments, scans the files and matches with the glob pattern, and then returns all the leaf files that were matched as a sequence of strings.

The function also uses the utility function globPath from the SparkHadoopUtil package. This function lists all the paths in a directory with the specified prefix, and does not further list leaf children (files). The list of paths is passed into InMemoryFileIndex.bulkListLeafFiles method, which is a Spark internal API for distributed file listing.

Neither of these listing utility functions work well alone. By combining them you can get a list of top-level directories that you want to list using globPath function, which will run on the driver, and you can distribute the listing for all child leaves of the top-level directories into Spark workers using bulkListLeafFiles.

The speed-up can be around 20-50x faster according to Amdahl’s law. The reason is that, you can easily control the glob path according to the real file physical layout and control the parallelism through spark.sql.sources.parallelPartitionDiscovery.parallelism for InMemoryFileIndex.

Delete files

When you delete files or partitions from an unmanaged table, you can use the Databricks utility function dbutils.fs.rm. This function leverages the native cloud storage file system API, which is optimized for all file operations. However, you can’t delete a gigantic table directly using dbutils.fs.rm("path/to/the/table").

You can list files efficiently using the script above. For smaller tables, the collected paths of the files to delete fit into the driver memory, so you can use a Spark job to distribute the file deletion task.

For gigantic tables, even for a single top-level partition, the string representations of the file paths cannot fit into the driver memory. The easiest way to solve this problem is to collect the paths of the inner partitions recursively, list the paths, and delete them in parallel.

%scala

import scala.util.{Try, Success, Failure}

def delete(p: String): Unit = {
  dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
    dbutils.fs.rm(file(0).toString, true)
    println(s"deleted file: $file")
  }
}

final def walkDelete(root: String)(level: Int): Unit = {
  dbutils.fs.ls(root).map(_.path).foreach { p =>
    println(s"Deleting: $p, on level: ${level}")
    val deleting = Try {
      if(level == 0) delete(p)
      else if(p endsWith "/") walkDelete(p)(level-1)
      //
      // Set only n levels of recursion, so it won't be a problem
      //
      else delete(p)
    }
    deleting match {
      case Success(v) => {
        println(s"Successfully deleted $p")
        dbutils.fs.rm(p, true)
      }
      case Failure(e) => println(e.getMessage)
    }
  }
}

The code deletes inner partitions while ensuring that the partition that is being deleted is small enough. It does this by searching through the partitions recursively by each level, and only starts deleting when it hits the level you set. For instance, if you want to start with deleting the top-level partitions, use walkDelete(root)(0). Spark will delete all the files under dbfs:/mnt/path/table/a=1/, then delete .../a=2/, following the pattern until it is exhausted.

The Spark job distributes the deletion task using the delete function shown above, listing the files with dbutils.fs.ls with the assumption that the number of child partitions at this level is small. You can also be more efficient by replacing the dbutils.fs.ls function with the listFiles function shown above, with only slight modification.

Summary

These two approaches highlight methods for listing and deleting gigantic tables. They use some Spark utility functions and functions specific to the Databricks environment. Even if you cannot use them directly, you can create your own utility functions to solve the problem in an analogous way.

Was this article helpful?