Friday, March 13, 2015

Reading Oracle data using the Apache Spark DataFrame API

The new version of Apache Spark (1.3) introduces a new API, the DataFrame. As mentioned in an earlier post, the new API will make it easy for data scientists and people with a SQL background to perform analyses with Spark.

The goal of this post is to experiment with the jdbc feature of Apache Spark 1.3. We will load tables from an Oracle database (12c) and generate a result set by joining 2 tables.

In order to connect to the database using JDBC, a JAR file has to be added to our CLASSPATH. The ojdbc JAR is available at

http://www.oracle.com/technetwork/database/features/jdbc/jdbc-drivers-12c-download-1958347.html

We’ll be using the file ojdbc7.jar.

First download the last Apache Spark 1.3 snapshot from https://spark.apache.org/downloads.html

image

We create a new project in IntelliJ idea then add the Spark 1.3 jar and the ojdbc7.jar file to our project as a dependency under “Project Structure”, “Modules”

image

Oracle databases come with several seed schemas such the HR, we’ll pull 2 tables from that schema: EMPLOYEES and DEPARTMENTS.

image

image

To connect to the Oracle database, the JDBC thin driver will be used. Given the following:

    • Database server: localhost
    • Database service name: pdborcl
    • Port:  1521
    • Userid/password: hr/hr

the JDBC URL is

jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl

Let’s create a DataFrame (employees) from the “EMPLOYEES” table of the Oracle database:


  val employees = sqlContext.load("jdbc", 
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))

and a second DataFrame from the “DEPARTMENTS”


val departments  = sqlContext.load("jdbc", 
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))

Let’s join EMPLOYEES and DEPARTMENTS and print out the results


   val empDepartments = employees.join(departments, 
employees.col("DEPARTMENT_ID")===
departments("DEPARTMENT_ID"))

image


The new DataFrame API has a nice function (printSchema) that prints out the DataFrame schema in a nice tree format.


 empDepartments.printSchema()

image


Following is the whole code listing


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object sparkJDBC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ORCL db")
.setMaster("local[4]")
val sc = new SparkContext(conf)
var sqlContext = new SQLContext(sc)
val employees = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))
val departments = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))
val empDepartments = employees.join(departments,
employees.col("DEPARTMENT_ID")===departments("DEPARTMENT_ID"))
empDepartments.foreach(println)
empDepartments.printSchema()
}
}

 


There you have it, this was a quick introduction to using the new Apache Spark DataFrame JDBC feature to pull data from an Oracle database. This new API will without a doubt offer more flexibility to perform analyses stored in a relational database.

11 comments:

  1. Hi Keita,

    Thanks for posting this code, I'm also working with Apache Spark and Oracle and I'm having problems with the performance, queries take hours to finish, have you faced any similar performance issue? Have you tried querying data from remote databases?

    Thanks,

    Regards!

    ReplyDelete
    Replies
    1. Hi Michael,

      I am a newbie to Spark and just wanted to check with you if you were able to built performant application using Oracle SQL and Spark ?

      Regards,
      Nikhil Misra

      Delete
  2. May I ask if it is possible during sqlContext.load to run a sql and get the result set to a dataframe instead of the entire table?

    ReplyDelete
  3. I am getting SQLException: Not suitable Driver? what do I need to set?

    ReplyDelete
    Replies
    1. SET :
      val driver = "oracle.jdbc.driver.OracleDriver";
      Class.forName(driver);

      Delete
    2. After this code also facing same error. can you please help here

      Delete
  4. What if i want to fetch selective columns only. in your example you are making DF of full table.

    ReplyDelete
  5. HashMap data = new HashMap();
    data.put("url", url);
    data.put("user", "user");
    data.put("driver", "oracle.jdbc.driver.OracleDriver");
    data.put("password", "pw");
    data.put("dbtable", "(select ID_SOURCE_SYSTEM from EXT_COUNTRY.DWH_BDC_TDS_MTH) as D");

    DataFrame df1 =sqlContext.read().format("org.apache.spark.sql.jdbc").options(data).load();
    df1.show();


    i am getting "Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended"

    ReplyDelete
  6. when I load data from oracle to spark dataframe it will distribute data over cluster nodes ? correct me if i am asking wrong way, i am new to spark even Hadoop

    ReplyDelete
  7. Caesars Entertainment & Gambling Company Profile - Jeopardy - Ktm
    › ccaesars-gaming-gambling › ccaesars-gaming-gambling Caesars Entertainment, one of the largest U.S. real estate companies, The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. real 광주광역 출장마사지 estate 포천 출장마사지 company, The largest U.S. real estate company, The largest U.S. real estate company, 목포 출장안마 The largest U.S. real estate company, 충청남도 출장마사지 The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. 성남 출장안마 real estate company, The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. real estate company

    ReplyDelete