In a previous blog post, we experimented with a dataset dealing with crime in Chicago. I’ve been reading a great book “Advanced Analytics with Spark”, I decided to experiment with the techniques that I learned in Chapter 8 in order to analyze the spatial features of the data set. Recall that the crime data set was released by the city of Chicago and made available on the Socrata website. The file has been exported as CSV and is about 4 Gb big; it contains about 4 million crimes covering the 2001-2014 period. The main question we will try to answer is : what is the spatial distribution of crimes committed?
Let’s start a new IntelliJ IDEA SBT project
The SBT build file is
name := "crimeData"
version := "1.0"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.0"
libraryDependencies += "com.github.nscala-time" %% "nscala-time" % "0.4.2"
libraryDependencies += "com.esri.geometry" % "esri-geometry-api" % "1.1.1"
libraryDependencies += "io.spray" %% "spray-json" % "1.3.1"
It includes extra dependencies that will be useful for our analyses:
- nscala-time makes it easy to deal with temporal data computations
- esri-geometry-api will help with geolocation analyses
- spray library for handling JSON data
Following the steps covered in previous posts, we’ll create a Resilient Distributed Dataset (RDD) from our CSV dataset. The data will pull be loaded via a Scala case class
case class crimeData(caseNum:String, id: String,
dateofOcc: DateTime,block: String,IUCR:String,primaryType: String, desc: String,
location: String, arrest: Boolean,domestic: Boolean, beat: String,
district: String, ward: String, community: String, fbiCode: String,xCoord: Long,
yCoord: Long, year: Int, updatedOn: DateTime,lat: Double, lon: Double, loc: String,crimeCoord: Point)
One noteworthy detail is the type of the last field crimeCoord: Point. This attribute from the ESRI geolocation API will store the GPS coordinates where the crime occurred, it is define as “a single location in space and is the building block for all other geometry types”. Using this field and additional operations provided by the ESRI API, we will be able to determine for a given Point in space, where it is located in the city of Chicago.
def point(longitude: String, latitude: String): Point = {
new Point(longitude.toDouble, latitude.toDouble)
}
The spatial data used for the boundaries of the different areas of Chicago is in a format called GeoJSON which is an open standard format for encoding collections of simple geographical features along with their non-spatial attributes using JSON. The main objects of GeoJSON are Point and feature made up of a geometry instance and a set of key-value pairs called properties.A geometry is a shape like a point, line, or polygon. A set of features is called a FeatureCollection. . The Chicago GeoJSON dataset used for this analysis is available here.
Let’s write our parsing function, we will use a slightly different approach this time by making use of Scala Either class which represents a value of one of two possible types Instances of Either are either an instance of scala.util.Left or scala.util.Right: Either[L,R] In the context of the CSV file parsing, the “left” output will represent data successfully parsed and the “right” output will be a tuple of the exception and the data that caused it. The original code for parsing the records is in the “Advanced Analytics with Spark” book, the listing is a follows:
def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
new Function[S, Either[T, (S, Exception)]] with Serializable {
def apply(s: S): Either[T, (S, Exception)] = {
try {
Left(f(s))
} catch {
case e: Exception => Right((s, e))
}
}
}
}
We can now create a safe wrapper function called safeParse by passing our parse function (of type String=>crime) to the safe function, and then applying safeParse to the crimeRaw RDD. We then print the first 10 records to ensure that our parsing function is working correctly.
val safeParse = safe(parse)
val crimeParsed = crimeRaw.map(safeParse)
crimeParsed.cache()
crimeParsed.take(10).foreach(println)
The first record being the header falls into the “Right” category, the exception is “java.text.ParseException: Unparseable date: "Date" ” the remaining 9 records are valid.
We can apply map to only select the valid records:
crimeGoodData.values.map(zone).countByValue().foreach(println)
We’ve got our data read in ,cleaned up and read into an RDD, we can proceed with the geospatial analysis: recall that our main goal is to find get a distribution of crime occurrence in Chicago.
The first step is to load our geolocation file using regular Scala IO command
val geojson = scala.io.Source.fromFile("/home/spark/data/chicago.geojson").mkString
We will be using Spray and Esri into the Spark shell so that we can parse the geoJSON string into an instance of our FeatureCollection case class as illustrated in the book mentioned earlier, the code below is to be inserted into our chicagoCrime object. It contains all the objects needed for processing the geoJSON data and to perform the operations for assigning each crime occurrence into a neighborhood in Chicago.
class RichGeometry(val geometry: Geometry,
val spatialReference: SpatialReference =
SpatialReference.create(4326)) {
def area2D() = geometry.calculateArea2D()
def contains(other: Geometry): Boolean = {
GeometryEngine.contains(geometry, other, spatialReference)
}
def distance(other: Geometry): Double =
GeometryEngine.distance(geometry, other, spatialReference)
}
object RichGeometry {
implicit def wrapRichGeo(g: Geometry) = {
new RichGeometry(g)
}
}
case class Feature(
val id: Option[JsValue],
val properties: Map[String, JsValue],
val geometry: RichGeometry) {
def apply(property: String) = properties(property)
def get(property: String) = properties.get(property)
}
case class FeatureCollection(features: Array[Feature])
extends IndexedSeq[Feature] {
def apply(index: Int) = features(index)
def length = features.length
}
case class GeometryCollection(geometries: Array[RichGeometry])
extends IndexedSeq[RichGeometry] {
def apply(index: Int) = geometries(index)
def length = geometries.length
}
object GeoJsonProtocol extends DefaultJsonProtocol {
implicit object RichGeometryJsonFormat extends RootJsonFormat[RichGeometry] {
def write(g: RichGeometry) = {
GeometryEngine.geometryToGeoJson(g.spatialReference, g.geometry).parseJson
}
def read(value: JsValue) = {
val mg = GeometryEngine.geometryFromGeoJson(value.compactPrint, 0, Geometry.Type.Unknown)
new RichGeometry(mg.getGeometry, mg.getSpatialReference)
}
}
implicit object FeatureJsonFormat extends RootJsonFormat[Feature] {
def write(f: Feature) = {
val buf = scala.collection.mutable.ArrayBuffer(
"type" -> JsString("Feature"),
"properties" -> JsObject(f.properties),
"geometry" -> f.geometry.toJson)
f.id.foreach(v => { buf += "id" -> v})
JsObject(buf.toMap)
}
def read(value: JsValue) = {
val jso = value.asJsObject
val id = jso.fields.get("id")
val properties = jso.fields("properties").asJsObject.fields
val geometry = jso.fields("geometry").convertTo[RichGeometry]
Feature(id, properties, geometry)
}
}
implicit object FeatureCollectionJsonFormat extends RootJsonFormat[FeatureCollection] {
def write(fc: FeatureCollection) = {
JsObject(
"type" -> JsString("FeatureCollection"),
"features" -> JsArray(fc.features.map(_.toJson): _*)
)
}
def read(value: JsValue) = {
FeatureCollection(value.asJsObject.fields("features").convertTo[Array[Feature]])
}
}
implicit object GeometryCollectionJsonFormat extends RootJsonFormat[GeometryCollection] {
def write(gc: GeometryCollection) = {
JsObject(
"type" -> JsString("GeometryCollection"),
"geometries" -> JsArray(gc.geometries.map(_.toJson): _*))
}
def read(value: JsValue) = {
GeometryCollection(value.asJsObject.fields("geometries").convertTo[Array[RichGeometry]])
}
}
}
Now that all the requisite objects are defined for parsing geoSON, let’s process our geoJSON data file into an instance of our FeatureCollection case class.
val features = geojson.parseJson.convertTo[FeatureCollection]
FeatureCollection contains a function “find’ that will allows to find to what part of Chicago from the geoGSON file a given point where a crime occurred belongs to.
def zone(crime: crimeData): Option[String] = {
val feature: Option[Feature] = features.find(f => {
f.geometry.contains(crime.crimeCoord)
})
feature.map(f => {
f("name").convertTo[String]
})
}
Finally, we can process crimeDataGood to get a list of Chicago neighborhoods and the associated number of crimes ordered descending ; this is achieved by using sortBy(-_._2)
crimeGoodData.values.map(zone).countByValue()
.toList.sortBy(-_._2).foreach(println)
How about the number of crimes type “Homicide” per neighborhood?
crimeGoodData.values.filter{ p =>
p.primaryType=="HOMICIDE"
}.map(zone)
.countByValue()
.toList.
sortBy(-_._2)
.foreach(println)
The code is available here.