The goal of this post is to experiment with a new introduced a new Apache Spark API: the dataframe. It makes it easier for data scientist to manipulate data. This feature will be part of Apache Spark 1.3 due to be released early March, however one can download the development branch and build it.
I will be experimenting with a Chicago crime data set from www.socrata.com. The file can be downloaded from chicagoCrimeData.csv
The data set includes a header and it contains 10 fields
Case# | Date | Block | Primary Desc | Secondary Desc | Location Desc | Arrest | Domestic | Latitude | Longitude |
String | String | String | String | String | String | String | String | Double | Double |
The second field will be split into two: date and time. Following is the case class that will be used to load the data
case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)
Prior to parsing the data, we need to set filters for removing the header as well as taking care of missing data. Lines that include missing data for a given field represented by “,,” will be excluded
val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line=> line != header)
We can define a method for parsing the different fields
def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
val crime = goodData.map(parseCrimeData)
Once the data is loaded, we can print it out to make sure that our parsing method worked properly
Now that we’ve got our crime RDD, we can use the method toDF() in order to convert our RDD into a data frame crimeDF
val crimeDF = crime.toDF()
The data frame will allow us to perform data analysis in a flexible way, let’s get a count of the crime occurrences by crime type. This is easily done by:
crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)
One of the nice feature of a Spark data frame is that it allows running regular SQL against it after the DF has be registered as a temporary table (crimeData)
crimeDF.registerTempTable("crimeData")
Let’s run the same query as the previous query but by using a regular SQL query:
sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)
we get a tabulation of crimes by crime type
Following is the full code listing
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
case class crimeData(caseNum: String, date: String, time: String, block: String,
primaryDesc: String, secondaryDesc: String, location: String, arrest: String,
domestic: String, lat: String, lon: String)
object sparkDF {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sparkDF").setMaster("local[4]")
val sc = new SparkContext(conf)
val filename = "chicagoCrimeData.csv"
def parseCrimeData(str: String): crimeData = {
val fields = str.split(",")
crimeData(fields(0), fields(1).split(" ")(0), fields(1).split(" ")(1), fields(2), fields(3), fields(4), fields(5),
fields(6), fields(7), fields(8), fields(9))
}
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val input = sc.textFile(filename)
val data = input.filter(line => !line.contains(",,"))
val header = data.take(1)(0)
val goodData = data.filter(line => line != header)
val crime = goodData.map(parseCrimeData).toDF()
val crimeDF = crime.toDF().cache()
crimeDF.select( "caseNum", "primaryDesc").orderBy("primaryDesc").groupBy("primaryDesc").count().distinct.foreach(println)
crimeDF.registerTempTable("crimeData")
sqlContext.sql(
"""
|SELECT count( distinct caseNum), primaryDesc
| FROM crimeData group by primaryDesc
| order by primaryDesc
""".stripMargin)
.collect()
.foreach(println)
}
}
No comments:
Post a Comment