Saturday, February 21, 2015

Experimenting with the Apache Spark Dataframe API

The goal of this post is to experiment with a new introduced a new Apache Spark API: the dataframe. It makes it easier for data scientist to manipulate data. This feature will be part of Apache Spark 1.3 due to be released early March, however one can download the development branch and build it.

I will be experimenting with a Chicago crime data set from www.socrata.com. The file can be downloaded from chicagoCrimeData.csv

image

The data set includes a header and it contains 10 fields

Case# Date Block Primary Desc Secondary Desc Location Desc Arrest Domestic Latitude Longitude
String String String String String String String String Double Double

The second field will be split into two: date and time. Following is the case class that will be used to load the data

case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)

Prior to parsing the data, we need to set filters for removing the header as well as taking care of missing data. Lines that include missing data for a given field represented by “,,” will be excluded


    val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line=> line != header)

We can define a method for parsing the different fields


    def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
We then load the data by
val crime = goodData.map(parseCrimeData)

Once the data is loaded, we can print it out to make sure that our parsing method worked properly


image


Now that we’ve got our crime RDD, we can use the method toDF() in order to convert our RDD into a data frame crimeDF


val crimeDF = crime.toDF()

The data frame will allow us to perform data analysis in a flexible way, let’s get a count of the crime occurrences by crime type. This is easily done by:


crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)

image


One of the nice feature of a Spark data frame is that it allows running regular SQL against it after the DF has be registered as a temporary table (crimeData)


crimeDF.registerTempTable("crimeData")

Let’s run the same query as the previous query but by using a regular SQL query:


    sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)

we get a tabulation of crimes by crime type


image


Following is the full code listing


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)
object sparkDF {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sparkDF").setMaster("local[4]")
val sc = new SparkContext(conf)
val filename = "chicagoCrimeData.csv"
def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line => line != header)
val crime = goodData.map(parseCrimeData).toDF()
val crimeDF = crime.toDF().cache()
crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)
crimeDF.registerTempTable("crimeData")

sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)
}
}

No comments:

Post a Comment