In the previous post we loaded data into Spark via a Scala case class, were continuing our experiment, this time the data will be loaded using Spark SQL. Spark SQL is component of Spark and it provides a SQL like interface.
Continuing with the code from the previous post, let’s create a SQLContext which is an entry point for running queries in Spark
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
Let’s create a schema RDD programmatically, this method is recommended when a case class cannot be defined ahead of time according to the Spark SQL documentation.
There are three steps as detailed in the documentation:
- Create an RDD of
Row
s from the original RDD; - Create the schema represented by a
StructType
matching the structure ofRow
s in the RDD created in Step 1. - Apply the schema to the RDD of
Row
s viaapplySchema
method provided bySQLContext
.
The schema RDD is represented by by a StructType matching the structure of the data that is being read.
val schema = StructType(Array(StructField("year",IntegerType,true), StructField("month",IntegerType,true), StructField("tAverage",DoubleType,true), StructField("tMaximum",DoubleType,true), StructField("tMinimum",DoubleType,true)))
Next, we convert the rows of the weather data into Rows
val rowRDD = source.map(_.split(",")) .map(p => org.apache.spark.sql.Row(p(1) .toInt,p(2).toInt,p(4).toDouble,p(5) .toDouble, p(6).toDouble ))
Next, we apply the schema
val tempDataRDD = sqlContext.applySchema(rowRDD, schema)
Finally we register the schema as a table
tempDataRDD.registerTempTable("temperatureData")
Now that the schema has been registered, we can run a SQL query
sql("select year, month, AVG(tAverage) from temperatureData group by year order by year").foreach(println)
to compute the average temperature by year. The results are displayed interleaved with log entries
Note that while the application is running, you can get all the job details by logging into sparkUI, typically the port is 4040 (the correct port is displayed on the console)