Access denied when writing to an S3 bucket using RDD
Problem
Writing to an S3 bucket using RDDs fails. The driver node can write, but the worker (executor) node returns an access denied error. Writing with the DataFrame API, however works fine.
For example, let’s say you run the following code:
import java.io.File
import java.io.Serializable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import java.net.URI
import scala.collection.mutable
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
val ssc = new StreamingContext(sc, Seconds(10))
val rdd1 = sc.parallelize(Seq(1,2))
val rdd2 = sc.parallelize(Seq(3,4))
val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))
val result = inputStream.map(x => x*x)
val count = result.foreachRDD { rdd =>
val config = new Configuration(sc.hadoopConfiguration) with Serializable
rdd.mapPartitions {
_.map { entry =>
val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config)
val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123")
val file = fs.create(path)
file.write("foobar".getBytes)
file.close()
}
}.count()
}
println(s"Count is $count")
ssc.start()
The following error is returned:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure:
Lost task 3.3 in stage 0.0 (TID 7, 10.205.244.228, executor 0): java.rmi.RemoteException: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied; Request ID: F81ADFACBCDFE626,
Extended Request ID: 1DNcBUHsmUFFI9a1lz0yGt4dnRjdY5V3C+J/DiEeg8Z4tMOLphZwW2U+sdxmr8fluQZ1R/3BCep,
Cause
When you write to the worker node using RDD, the IAM policy denies access if you use Serializable
, as in val config = new Configuration(sc.hadoopConfiguration) with Serializable
.
Solution
There are two ways to solve this problem:
Option 1: Use DataFrames
dbutils.fs.put("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt", "foobar")
val df = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt")
df.write.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")
val df1 = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")
Option 2: Use SerializableConfiguration
If you want to use RDDs, use:
val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
For example:
import java.io.File
import java.io.Serializable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import java.net.URI
import scala.collection.mutable
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.util.SerializableConfiguration
val ssc = new StreamingContext(sc, Seconds(10))
val rdd1 = sc.parallelize(Seq(1,2))
val rdd2 = sc.parallelize(Seq(3,4))
val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))
val result = inputStream.map(x => x*x)
val count = result.foreachRDD { rdd =>
//val config = new Configuration(sc.hadoopConfiguration) with Serializable
val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
rdd.mapPartitions {
_.map { entry =>
val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value)
val path = new Path("s3:///path/part_0000000-123")
val file = fs.create(path)
file.write("foobar".getBytes)
file.close()
}
}.count()
}
println(s"Count is $count")
ssc.start()