How to handle blob data contained in an XML file
If an EC2 log records events in XML format, then every XML event will record EC2-related information as a base64 string. In order to run analytics on this data using Apache Spark, you need to use the spark_xml
library and BASE64DECODER
API to transform this data for analysis.
Problem
You need to analyze base64-encoded strings from an XML-formatted log file using Spark. For example, the following file input.xml
shows this type of format:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log [<!ENTITY % log SYSTEM "aws_instance">%log;]>
<log systemID="MF2018" timeZone="UTC" timeStamp="Mon Mar 25 16:00:01 2018">
<message source="ec2.log" time="Mon Mar 25 16:00:01 2018" type="sysMSG"><text/>
<detail>
<blob>aW5zdGFuY2VJZCxzdGFydFRpbWUsZGVsZXRlVGltZSxob3Vycw0KaS0wMjdmYTdjY2RhMjEwYjRmNCwyLzE3LzE3VDIwOjIxLDIvMTcvMTdUMjE6MTEsNQ0KaS0wN2NkNzEwMGUzZjU0YmY2YSwyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsNA0KaS0wYTJjNGFkYmYwZGMyNTUxYywyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsMg0KaS0wYjQwYjE2MjM2Mzg4OTczZiwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsNg0KaS0wY2ZkODgwNzIyZTE1ZjE5ZSwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsMg0KaS0wY2YwYzczZWZlZWExNGY3NCwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsMQ0KaS0wNTA1ZTk1YmZlYmVjZDZlNiwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsOA==
</blob>
</detail>
</message>
</log>
Solution
To parse the XML file:
- Load the XML data.
- Use the
spark_xml
library and create a rawDataFrame
. - Apply a base64 decoder on the blob column using the
BASE64Decoder
API. - Save the decoded data in a text file (optional).
- Load the text file using the Spark
DataFrame
and parse it. - Create the
DataFrame
as a Spark SQL table.
The following Scala code processes the file:
val xmlfile = "/mnt/vgiri/input.xml"
val readxml = spark.read.format("com.databricks.spark.xml").option("rowTag","message").load(xmlfile)
val decoded = readxml.selectExpr("_source as source","_time as time","_type as type","detail.blob")
decoded.show() //Displays the raw blob data
//Apply base64 decoder on every piece of blob data as shown below
val decodethisxmlblob = decoded.rdd
.map(str => str(3).toString)
.map(str1 => new String(new sun.misc.BASE64Decoder()
.decodeBuffer(str1)))
//Store it in a text file temporarily
decodethisxmlblob.saveAsTextFile("/mnt/vgiri/ec2blobtotxt")
//Parse the text file as required using Spark DataFrame.
val readAsDF = spark.sparkContext.textFile("/mnt/vgiri/ec2blobtotxt")
val header = readAsDF.first()
val finalTextFile = readAsDF.filter(row => row != header)
val finalDF = finalTextFile.toDF()
.selectExpr(
("split(value, ',')[0] as instanceId"),
("split(value, ',')[1] as startTime"),
("split(value, ',')[2] as deleteTime"),
("split(value, ',')[3] as hours")
)
finalDF.show()
The Spark code generates the following output:
18/03/24 22:54:31 INFO DAGScheduler: ResultStage 4 (show at SparkXMLBlob.scala:42) finished in 0.016 s
18/03/24 22:54:31 INFO DAGScheduler: Job 4 finished: show at SparkXMLBlob.scala:42, took 0.019120 s
18/03/24 22:54:31 INFO SparkContext: Invoking stop() from shutdown hook
+-------------------+-------------+-------------+-----+
| instanceId | startTime | deleteTime |hours|
+-------------------+-------------+-------------+-----+
|i-027fa7ccda210b4f4|2/17/17T20:21|2/17/17T21:11| 5|
|i-07cd7100e3f54bf6a|2/17/17T20:19|2/17/17T21:11| 4|
|i-0a2c4adbf0dc2551c|2/17/17T20:19|2/17/17T21:11| 2|
|i-0b40b16236388973f|2/17/17T20:18|2/17/17T21:11| 6|
|i-0cfd880722e15f19e|2/17/17T20:18|2/17/17T21:11| 2|
|i-0cf0c73efeea14f74|2/17/17T16:21|2/17/17T17:11| 1|
|i-0505e95bfebecd6e6|2/17/17T16:21|2/17/17T17:11| 8|
+-------------------+-------------+-------------+-----+