If data in S3 is stored by partition, the partition column values are used to name folders in the source directory structure. However, if you use an SQS queue as a streaming source, the S3-SQS source cannot detect the partition column values.
For example, if you save the following DataFrame to S3 in JSON format:
%scala val df = spark.range(10).withColumn("date",current_date()) df.write.partitionBy("date").json("s3a://bucket-name/json")
The file structure underneath will be:
%scala s3a://bucket-name/json/_SUCCESS s3a://bucket-name/json/date=2018-10-25/<individual JSON files>
Let’s say you have an S3-SQS input stream created from the queue configured for this S3 bucket. If you directly load the data from this S3-SQS input stream using the following code:
%scala import org.apache.spark.sql.types._ val schema = StructType(List(StructField("id", IntegerType, false),StructField("date", DateType, false))) display(spark.readStream .format("s3-sqs") .option("fileFormat", "json") .option("queueUrl", "") .option("sqsFetchInterval", "1m") .option("ignoreFileDeletion", true) .schema(schema) .load())
The output will be:
You can see the date column values are not populated correctly.
You can use a combination of input_file_name() and regexp_extract() UDFs to extract the date values properly, as in the following code snippet:
%scala import org.apache.spark.sql.functions._ val df = spark.readStream .format("s3-sqs") .option("fileFormat", "json") .option("queueUrl", "") .option("sqsFetchInterval", fetch_interval) .option("ignoreFileDeletion", true) .schema(schema) .load() display(df.withColumn("date",regexp_extract(input_file_name(), "/date=(\\d{4}-\\d{2}-\\d{2})/", 1)))
Now you can see the correct values for the date column in the following output: