You are running a streaming job with Auto Loader (AWS | Azure | GCP) and want to get the last modification time for each file from the storage account.
Instructions
The Get the path of files consumed by Auto Loader article describes how to get the filenames and paths for all files consumed by the Auto Loader. In this article, we build on that foundation and use sample code to show you how to apply a custom UDF and then extract the last modification time for the file.
- Start out by defining your imports and variables. You need to define the <storage-base-path>, as well as the <input-dir>, and <output-dir> you are using.
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.functions.{input_file_name, col, udf, from_unixtime} import org.apache.spark.sql.types._ val basePath = "<storage-base-path>" val inputLocation = basePath + "<input-dir>" val outputLocation = basePath + "<output-dir>"
- For this example, we need to generate sample data and store it in a DataFrame. In a practical use case, you would be reading data from your storage bucket.
import org.apache.spark.sql.types._ val sampleData = Seq( Row(1, "James", 10, "M", 1000), Row(1, "Michael", 20, "F", 2000), Row(2, "Robert", 30, "M", 3000), Row(2, "Maria", 40, "F", 4000), Row(3, "Jen", 50, "M", 5000) ) val sampleSchema = StructType(Array( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("gender", StringType, true), StructField("salary", IntegerType, true) )) val df = spark.createDataFrame(sc.parallelize(sampleData), sampleSchema) df.coalesce(1).write.format("parquet").partitionBy("id", "age").mode("append").save(inputLocation); spark.read.format("parquet").load(inputLocation).count();
- Create a custom UDF to list all files in the storage path and return the last modification time for each file.
val getModificationTimeUDF = udf((path: String) => { val finalPath = new Path(path) val fs = finalPath.getFileSystem(conf) if(fs.exists(finalPath)) {fs.listStatus(new Path(path)).head.getModificationTime} else {-1 // Or some other value based on business decision } })
- Apply the UDF to the batch job. The UDF returns each file's last modification time in UNIX time format. To convert this into a human-readable format divide by 1000 and then cast it as the timestamp.
val df = spark.read.format("parquet").load(inputLocation) .withColumn("filePath", input_file_name()) .withColumn("fileModificationTime", getModificationTimeUDF(col("filePath"))) .withColumn("fileModificationTimestamp", from_unixtime($"fileModificationTime" / 1000, "yyyy-MM-dd HH:mm:ss").cast(TimestampType).as("timestamp")).drop("fileModificationTime") display(df)
- Apply the UDF to the Auto Loader streaming job.
val sdf = spark.readStream.format("cloudFiles") .schema(sampleSchema) .option("cloudFiles.format", "parquet") .option("cloudFiles.includeExistingFiles", "true") .option("cloudFiles.connectionString", connectionString) .option("cloudFiles.resourceGroup", resourceGroup) .option("cloudFiles.subscriptionId", subscriptionId) .option("cloudFiles.tenantId", tenantId) .option("cloudFiles.clientId", clientId) .option("cloudFiles.clientSecret", clientSecret) .option("cloudFiles.useNotifications", "true") .load(inputLocation) .withColumn("filePath", input_file_name()) .withColumn("fileModificationTime", getModificationTimeUDF(col("filePath"))) .withColumn("fileModificationTimestamp", from_unixtime($"fileModificationTime" / 1000, "yyyy-MM-dd HH:mm:ss").cast(TimestampType).as("timestamp")) .drop("fileModificationTime") display(sdf)
To recap, input_file_name() is used to read an absolute file path, including the file name. We then created a custom UDF to list all files from the storage path. You can get the file's last modification time from each file but it is listed in UNIX time format. Convert the UNIX time format into a readable format by dividing UNIX time by 1000 and converting it to a timestamp.