Friday, March 13, 2015

Reading Oracle data using the Apache Spark DataFrame API

The new version of Apache Spark (1.3) introduces a new API, the DataFrame. As mentioned in an earlier post, the new API will make it easy for data scientists and people with a SQL background to perform analyses with Spark.

The goal of this post is to experiment with the jdbc feature of Apache Spark 1.3. We will load tables from an Oracle database (12c) and generate a result set by joining 2 tables.

In order to connect to the database using JDBC, a JAR file has to be added to our CLASSPATH. The ojdbc JAR is available at

http://www.oracle.com/technetwork/database/features/jdbc/jdbc-drivers-12c-download-1958347.html

We’ll be using the file ojdbc7.jar.

First download the last Apache Spark 1.3 snapshot from https://spark.apache.org/downloads.html

image

We create a new project in IntelliJ idea then add the Spark 1.3 jar and the ojdbc7.jar file to our project as a dependency under “Project Structure”, “Modules”

image

Oracle databases come with several seed schemas such the HR, we’ll pull 2 tables from that schema: EMPLOYEES and DEPARTMENTS.

image

image

To connect to the Oracle database, the JDBC thin driver will be used. Given the following:

    • Database server: localhost
    • Database service name: pdborcl
    • Port:  1521
    • Userid/password: hr/hr

the JDBC URL is

jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl

Let’s create a DataFrame (employees) from the “EMPLOYEES” table of the Oracle database:


  val employees = sqlContext.load("jdbc", 
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))

and a second DataFrame from the “DEPARTMENTS”


val departments  = sqlContext.load("jdbc", 
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))

Let’s join EMPLOYEES and DEPARTMENTS and print out the results


   val empDepartments = employees.join(departments, 
employees.col("DEPARTMENT_ID")===
departments("DEPARTMENT_ID"))

image


The new DataFrame API has a nice function (printSchema) that prints out the DataFrame schema in a nice tree format.


 empDepartments.printSchema()

image


Following is the whole code listing


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object sparkJDBC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ORCL db")
.setMaster("local[4]")
val sc = new SparkContext(conf)
var sqlContext = new SQLContext(sc)
val employees = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))
val departments = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))
val empDepartments = employees.join(departments,
employees.col("DEPARTMENT_ID")===departments("DEPARTMENT_ID"))
empDepartments.foreach(println)
empDepartments.printSchema()
}
}

 


There you have it, this was a quick introduction to using the new Apache Spark DataFrame JDBC feature to pull data from an Oracle database. This new API will without a doubt offer more flexibility to perform analyses stored in a relational database.