Sunday, December 28, 2014

Loading data using Spark SQL (cont.)

In the previous post we loaded data into Spark via a Scala case class, were continuing our experiment, this time the data will be loaded using Spark SQL. Spark SQL is component of Spark and it provides a SQL like interface.

Continuing with the code from the previous post, let’s create a SQLContext which is an entry point for running queries in Spark

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

Let’s create a schema RDD programmatically, this method is recommended when a case class cannot be defined ahead of time according to the Spark SQL documentation.


There are three steps as detailed in the documentation:



  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via applySchema method provided by SQLContext.

The schema RDD is represented by by a StructType matching the structure of the data that is being read.


val schema = StructType(Array(StructField("year",IntegerType,true),
StructField("month",IntegerType,true), 
StructField("tAverage",DoubleType,true),
StructField("tMaximum",DoubleType,true),
StructField("tMinimum",DoubleType,true)))

Next, we convert the rows of the weather data into Rows


val rowRDD = source.map(_.split(","))
  .map(p => org.apache.spark.sql.Row(p(1)
  .toInt,p(2).toInt,p(4).toDouble,p(5)
  .toDouble, p(6).toDouble ))

Next, we apply the schema


val tempDataRDD = sqlContext.applySchema(rowRDD, schema)

Finally we register the schema as a table


tempDataRDD.registerTempTable("temperatureData")

Now that the schema has been registered, we can run a SQL query


sql("select year, month, AVG(tAverage) 
  from temperatureData group by year order by 
  year").foreach(println)

to compute the average temperature by year. The results are displayed interleaved with log entries


image


Note that while the application is running, you can get all the job details by logging into sparkUI, typically the port is 4040 (the correct port is displayed on the console)


image


image

No comments:

Post a Comment