Saturday, February 21, 2015

Experimenting with the Apache Spark Dataframe API

The goal of this post is to experiment with a new introduced a new Apache Spark API: the dataframe. It makes it easier for data scientist to manipulate data. This feature will be part of Apache Spark 1.3 due to be released early March, however one can download the development branch and build it.

I will be experimenting with a Chicago crime data set from www.socrata.com. The file can be downloaded from chicagoCrimeData.csv

image

The data set includes a header and it contains 10 fields

Case# Date Block Primary Desc Secondary Desc Location Desc Arrest Domestic Latitude Longitude
String String String String String String String String Double Double

The second field will be split into two: date and time. Following is the case class that will be used to load the data

case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)

Prior to parsing the data, we need to set filters for removing the header as well as taking care of missing data. Lines that include missing data for a given field represented by “,,” will be excluded


    val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line=> line != header)

We can define a method for parsing the different fields


    def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
We then load the data by
val crime = goodData.map(parseCrimeData)

Once the data is loaded, we can print it out to make sure that our parsing method worked properly


image


Now that we’ve got our crime RDD, we can use the method toDF() in order to convert our RDD into a data frame crimeDF


val crimeDF = crime.toDF()

The data frame will allow us to perform data analysis in a flexible way, let’s get a count of the crime occurrences by crime type. This is easily done by:


crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)

image


One of the nice feature of a Spark data frame is that it allows running regular SQL against it after the DF has be registered as a temporary table (crimeData)


crimeDF.registerTempTable("crimeData")

Let’s run the same query as the previous query but by using a regular SQL query:


    sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)

we get a tabulation of crimes by crime type


image


Following is the full code listing


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)
object sparkDF {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sparkDF").setMaster("local[4]")
val sc = new SparkContext(conf)
val filename = "chicagoCrimeData.csv"
def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line => line != header)
val crime = goodData.map(parseCrimeData).toDF()
val crimeDF = crime.toDF().cache()
crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)
crimeDF.registerTempTable("crimeData")

sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)
}
}

Monday, February 16, 2015

Building Apache Spark 1.3 development branch

In this short post, I am documenting how to download and build Apache Spark development branch 1.3 in Linux Mint. Apache Spark 1.3 provides a new feature, the DataFrame API similar to that of R and to a database table. The official version 1.3 is due to be released in early March, however one can download and evaluate the development version.

To download and build Apache Spark 1.3, do the following:

* clone the 1.3 branch by running at the shell prompt

git clone -b branch-1.3 https://github.com/apache/spark.git

image


Once this step is done, there should be a folder called “spark”, cd into the folder then run the build command using sbt


sudo sbt/sbt assembly


After a while, if the build is successful, you’ll see


image


the Spark shell is started by running the following command


 bin/spark-shell 


image