Handling partition column values while using an SQS queue as a streaming source

Written by Adam Pavlacka

Last published at: May 18th, 2022

Problem

If data in S3 is stored by partition, the partition column values are used to name folders in the source directory structure. However, if you use an SQS queue as a streaming source, the S3-SQS source cannot detect the partition column values.

For example, if you save the following DataFrame to S3 in JSON format:

%scala

val df = spark.range(10).withColumn("date",current_date())
df.write.partitionBy("date").json("s3a://bucket-name/json")

The file structure underneath will be:

%scala

s3a://bucket-name/json/_SUCCESS
s3a://bucket-name/json/date=2018-10-25/<individual JSON files>

Let’s say you have an S3-SQS input stream created from the queue configured for this S3 bucket. If you directly load the data from this S3-SQS input stream using the following code:

%scala

import org.apache.spark.sql.types._
val schema = StructType(List(StructField("id", IntegerType, false),StructField("date", DateType, false)))

display(spark.readStream
          .format("s3-sqs")
          .option("fileFormat", "json")
          .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue")
          .option("sqsFetchInterval", "1m")
          .option("ignoreFileDeletion", true)
          .schema(schema)
          .load())

The output will be:

Incorrect SQS stream results.

You can see the date column values are not populated correctly.

Solution

You can use a combination of input_file_name() and regexp_extract() UDFs to extract the date values properly, as in the following code snippet:

%scala

import org.apache.spark.sql.functions._
val df = spark.readStream
          .format("s3-sqs")
          .option("fileFormat", "json")
          .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue")
          .option("sqsFetchInterval", fetch_interval)
          .option("ignoreFileDeletion", true)
          .schema(schema)
          .load()
display(df.withColumn("date",regexp_extract(input_file_name(), "/date=(\\d{4}-\\d{2}-\\d{2})/", 1)))

Now you can see the correct values for the date column in the following output:

Correct SQS stream results.


Was this article helpful?