Problem
Writing to an S3 bucket using RDDs fails. The driver node can write, but the worker (executor) node returns an access denied error. Writing with the DataFrame API, however works fine.
For example, let’s say you run the following code:
%scala import java.io.File import java.io.Serializable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => val config = new Configuration(sc.hadoopConfiguration) with Serializable rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config) val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()
The following error is returned:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 7, 10.205.244.228, executor 0): java.rmi.RemoteException: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied; Request ID: F81ADFACBCDFE626, Extended Request ID: 1DNcBUHsmUFFI9a1lz0yGt4dnRjdY5V3C+J/DiEeg8Z4tMOLphZwW2U+sdxmr8fluQZ1R/3BCep,
Cause
When you write to the worker node using RDD, the IAM policy denies access if you use Serializable, as in val config = new Configuration(sc.hadoopConfiguration) with Serializable.
Solution
There are two ways to solve this problem:
Option 1: Use DataFrames
%scala dbutils.fs.put("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt", "foobar") val df = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt") df.write.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt") val df1 = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")
Option 2: Use SerializableConfiguration
If you want to use RDDs, use:
%scala val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
For example:
%scala import java.io.File import java.io.Serializable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.util.SerializableConfiguration val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => //val config = new Configuration(sc.hadoopConfiguration) with Serializable val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value) val path = new Path("s3:///path/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()