If you log events in XML format, then every XML event is recorded as a base64 string. In order to run analytics on this data using Apache Spark, you need to use the spark_xml library and the BASE64DECODER API to transform the data for analysis.
Problem
You need to analyze base64-encoded strings from an XML-formatted log file using Spark. For example, the following file input.xml shows this type of format:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE log [<!ENTITY % log SYSTEM "instance">%log;]> <log systemID="MF2018" timeZone="UTC" timeStamp="Mon Mar 25 16:00:01 2018"> <message source="message.log" time="Mon Mar 25 16:00:01 2018" type="sysMSG"><text/> <detail> <blob>aW5zdGFuY2VJZCxzdGFydFRpbWUsZGVsZXRlVGltZSxob3Vycw0KaS0wMjdmYTdjY2RhMjEwYjRmNCwyLzE3LzE3VDIwOjIxLDIvMTcvMTdUMjE6MTEsNQ0KaS0wN2NkNzEwMGUzZjU0YmY2YSwyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsNA0KaS0wYTJjNGFkYmYwZGMyNTUxYywyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsMg0KaS0wYjQwYjE2MjM2Mzg4OTczZiwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsNg0KaS0wY2ZkODgwNzIyZTE1ZjE5ZSwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsMg0KaS0wY2YwYzczZWZlZWExNGY3NCwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsMQ0KaS0wNTA1ZTk1YmZlYmVjZDZlNiwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsOA== </blob> </detail> </message> </log>
Solution
To parse the XML file:
- Load the XML data.
- Use the spark_xml library and create a raw DataFrame.
- Apply a base64 decoder on the blob column using the BASE64Decoder API.
- Save the decoded data in a text file (optional).
- Load the text file using the Spark DataFrame and parse it.
- Create the DataFrame as a Spark SQL table.
The following Scala code processes the file:
val xmlfile = "/mnt/<path>/input.xml" val readxml = spark.read.format("com.databricks.spark.xml").option("rowTag","message").load(xmlfile) val decoded = readxml.selectExpr("_source as source","_time as time","_type as type","detail.blob") decoded.show() //Displays the raw blob data //Apply base64 decoder on every piece of blob data as shown below val decodethisxmlblob = decoded.rdd .map(str => str(3).toString) .map(str1 => new String(new sun.misc.BASE64Decoder() .decodeBuffer(str1))) //Store it in a text file temporarily decodethisxmlblob.saveAsTextFile("/mnt/vgiri/ec2blobtotxt") //Parse the text file as required using Spark DataFrame. val readAsDF = spark.sparkContext.textFile("/mnt/vgiri/ec2blobtotxt") val header = readAsDF.first() val finalTextFile = readAsDF.filter(row => row != header) val finalDF = finalTextFile.toDF() .selectExpr( ("split(value, ',')[0] as instanceId"), ("split(value, ',')[1] as startTime"), ("split(value, ',')[2] as deleteTime"), ("split(value, ',')[3] as hours") ) finalDF.show()
The Spark code generates the following output:
18/03/24 22:54:31 INFO DAGScheduler: ResultStage 4 (show at SparkXMLBlob.scala:42) finished in 0.016 s 18/03/24 22:54:31 INFO DAGScheduler: Job 4 finished: show at SparkXMLBlob.scala:42, took 0.019120 s 18/03/24 22:54:31 INFO SparkContext: Invoking stop() from shutdown hook +-------------------+-------------+-------------+-----+ | instanceId | startTime | deleteTime |hours| +-------------------+-------------+-------------+-----+ |i-027fa7ccda210b4f4|2/17/17T20:21|2/17/17T21:11| 5| |i-07cd7100e3f54bf6a|2/17/17T20:19|2/17/17T21:11| 4| |i-0a2c4adbf0dc2551c|2/17/17T20:19|2/17/17T21:11| 2| |i-0b40b16236388973f|2/17/17T20:18|2/17/17T21:11| 6| |i-0cfd880722e15f19e|2/17/17T20:18|2/17/17T21:11| 2| |i-0cf0c73efeea14f74|2/17/17T16:21|2/17/17T17:11| 1| |i-0505e95bfebecd6e6|2/17/17T16:21|2/17/17T17:11| 8| +-------------------+-------------+-------------+-----+