Handling Partition Column Values While Using an SQS Queue as a Streaming Source


If data in S3 is stored by partition, the partition column values are used to name folders in the source directory structure. However, if you use an SQS queue as a streaming source, the S3-SQS source cannot detect the partition column values.

For example, if you save the following DataFrame to S3 in JSON format:

val df = spark.range(10).withColumn("date",current_date())

The file structure underneath will be:

s3a://bucket-name/json/date=2018-10-25/<individual JSON files>

Let’s say you have an S3-SQS input stream created from the queue configured for this S3 bucket. If you directly load the data from this S3-SQS input stream using the following code:

import org.apache.spark.sql.types._
val schema = StructType(List(StructField("id", IntegerType, false),StructField("date", DateType, false)))

          .option("fileFormat", "json")
          .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue")
          .option("sqsFetchInterval", "1m")
          .option("ignoreFileDeletion", true)

The output will be:


You can see the date column values are not populated correctly.


You can use a combination of input_file_name() and regexp_extract() UDFs to extract the date values properly, as in the following code snippet:

import org.apache.spark.sql.functions._
val df = spark.readStream
          .option("fileFormat", "json")
          .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue")
          .option("sqsFetchInterval", fetch_interval)
          .option("ignoreFileDeletion", true)
display(df.withColumn("date",regexp_extract(input_file_name(), "/date=(\\d{4}-\\d{2}-\\d{2})/", 1)))

Now you can see the correct values for the date column in the following output: