The new version of Apache Spark (1.3) introduces a new API, the DataFrame. As mentioned in an earlier post, the new API will make it easy for data scientists and people with a SQL background to perform analyses with Spark.
The goal of this post is to experiment with the jdbc feature of Apache Spark 1.3. We will load tables from an Oracle database (12c) and generate a result set by joining 2 tables.
In order to connect to the database using JDBC, a JAR file has to be added to our CLASSPATH. The ojdbc JAR is available at
http://www.oracle.com/technetwork/database/features/jdbc/jdbc-drivers-12c-download-1958347.html
We’ll be using the file ojdbc7.jar.
First download the last Apache Spark 1.3 snapshot from https://spark.apache.org/downloads.html
We create a new project in IntelliJ idea then add the Spark 1.3 jar and the ojdbc7.jar file to our project as a dependency under “Project Structure”, “Modules”
Oracle databases come with several seed schemas such the HR, we’ll pull 2 tables from that schema: EMPLOYEES and DEPARTMENTS.
To connect to the Oracle database, the JDBC thin driver will be used. Given the following:
- Database server: localhost
- Database service name: pdborcl
- Port: 1521
- Userid/password: hr/hr
the JDBC URL is
jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl
Let’s create a DataFrame (employees) from the “EMPLOYEES” table of the Oracle database:
val employees = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))
and a second DataFrame from the “DEPARTMENTS”
val departments = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))
Let’s join EMPLOYEES and DEPARTMENTS and print out the results
val empDepartments = employees.join(departments,
employees.col("DEPARTMENT_ID")===
departments("DEPARTMENT_ID"))
The new DataFrame API has a nice function (printSchema) that prints out the DataFrame schema in a nice tree format.
empDepartments.printSchema()
Following is the whole code listing
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object sparkJDBC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ORCL db")
.setMaster("local[4]")
val sc = new SparkContext(conf)
var sqlContext = new SQLContext(sc)
val employees = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.employees"))
val departments = sqlContext.load("jdbc",
Map("url" -> "jdbc:oracle:thin:hr/hr@//localhost:1521/pdborcl",
"dbtable" -> "hr.departments"))
val empDepartments = employees.join(departments,
employees.col("DEPARTMENT_ID")===departments("DEPARTMENT_ID"))
empDepartments.foreach(println)
empDepartments.printSchema()
}
}
There you have it, this was a quick introduction to using the new Apache Spark DataFrame JDBC feature to pull data from an Oracle database. This new API will without a doubt offer more flexibility to perform analyses stored in a relational database.
Hi Keita,
ReplyDeleteThanks for posting this code, I'm also working with Apache Spark and Oracle and I'm having problems with the performance, queries take hours to finish, have you faced any similar performance issue? Have you tried querying data from remote databases?
Thanks,
Regards!
May I ask if it is possible during sqlContext.load to run a sql and get the result set to a dataframe instead of the entire table?
ReplyDeleteI am getting SQLException: Not suitable Driver? what do I need to set?
ReplyDeleteSET :
Deleteval driver = "oracle.jdbc.driver.OracleDriver";
Class.forName(driver);
After this code also facing same error. can you please help here
DeleteWhat if i want to fetch selective columns only. in your example you are making DF of full table.
ReplyDeleteThis comment has been removed by the author.
DeleteHashMap data = new HashMap();
ReplyDeletedata.put("url", url);
data.put("user", "user");
data.put("driver", "oracle.jdbc.driver.OracleDriver");
data.put("password", "pw");
data.put("dbtable", "(select ID_SOURCE_SYSTEM from EXT_COUNTRY.DWH_BDC_TDS_MTH) as D");
DataFrame df1 =sqlContext.read().format("org.apache.spark.sql.jdbc").options(data).load();
df1.show();
i am getting "Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended"
when I load data from oracle to spark dataframe it will distribute data over cluster nodes ? correct me if i am asking wrong way, i am new to spark even Hadoop
ReplyDeleteCaesars Entertainment & Gambling Company Profile - Jeopardy - Ktm
ReplyDelete› ccaesars-gaming-gambling › ccaesars-gaming-gambling Caesars Entertainment, one of the largest U.S. real estate companies, The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. real 광주광역 출장마사지 estate 포천 출장마사지 company, The largest U.S. real estate company, The largest U.S. real estate company, 목포 출장안마 The largest U.S. real estate company, 충청남도 출장마사지 The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. 성남 출장안마 real estate company, The largest U.S. real estate company, The largest U.S. real estate company, The largest U.S. real estate company