Analyze Big data using Apache Spark SQL

By  Rajashekar Yedla

Apache Spark SQL is a powerful data processing engine and in-memory computing framework to perform processing quickly and analyze large vloume of data . We fetch the elements of a RDD into a Spark SQL table, and query on that table. We can write only SELECT queries on the Spark SQL table and no other SQL operations are possible. Select query on Spark SQL returns RDD only. It has Rich API’s supporting in 3 different languages (Java, Scala and Python).

We use Spark SQL extensively to perform ETL on Big Data where we find it convenient to dispense with writing complex code using Spark.

Working with Spark SQL:

As in Spark, to start with Spark SQL first we have to get the data into RDD(Resilient Distributed Data sets). Once the RDD is available, we create a Spark SQL table with desired RDD elements as table records. This we achieve using SparkSqlContext. Now we implement business logic writing appropriate SELECT queries on Spark SQL tables . The output of the query will be another RDD and the elements of output RDD will be  saved as a Text file, or as an Object file as we need..

Sample of  Apache Spark SQL:

Since Spark SQL has API’s designed in Java language, let us start with some sample example using Java.

To try and test the below code, ensure following are ready in your machine.

For the development:

Download Spark 1.2 and Spark SQL 1.2 jars

Eclipse IDE

To run the code:

Cloudera CDH 5.3.*

No need for any additional configuration since Cloudera provides CDH with the full set up of Spark SQL

We can run the Spark SQL programs on our development IDE’s also, changing the file path. While developing I would suggest to adapt this approach only. Once the development is totally completed , to run the program with large size data (big  data), run in cluster mode.

SparkSqlExample.java:

package com.bimarian.sparksql.spark_proj;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.api.java.JavaSQLContext;

import org.apache.spark.sql.api.java.JavaSchemaRDD;

import org.apache.spark.sql.api.java.Row;

 

public class SparkSqlExample {

public static void main(String[] args) {

SparkConf sparkConf = new  SparkConf().setAppName(“Spark_Sql “).setMaster(“local”);

JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaSQLContext sqlContext = new JavaSQLContext(sc);

String inputPath = args[0];

 

JavaRDD<Person> rdd = sc.textFile(inputPath).map(new

Function<String, Person>() {

public Person call(String line) throws Exception {

String[] parts = line.split(“,”);

Person person = new Person();

person.setName(parts[0]);

person.setAge(Integer.parseInt(parts[1]));

return person;

}

});

//Registering the RDD as a table using Schema RDD

JavaSchemaRDD schemaRDD = sqlContext.applySchema(rdd, Person.class);

schemaRDD.registerTempTable(“Person_Data”);

//Querying the “Person_Data” table using sqlContext.sql() method

String sql = “SELECT name, age FROM Person_Data “;

JavaSchemaRDD output = sqlContext.sql(sql);

/* sql() method returns a SchemaRDD which contains Row objects. Each Row represents one record. Here we have to get the elements from Row objects and convert them as Person Object */

JavaRDD<Person> finalOutput = output.map(new Function<Row, Person>() {

public Person call(Row row) throws Exception {

Person bean = new Person();

bean.setName(row.getString(0));

bean.setAge(row.getInt(1));

return bean;

}

});

System.out.println(finalOutput.collect());

finalOutput.saveAsTextFile(“SparkSQL_wordcount.csv”);

}

}

Now  generate the jar file and execute the jar , passing input file. Command to run the above program:

Spark -submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2

Spark -submit –class com.bimarian.sparksql.spark_proj.SparkSqlExample –master local Spark

SQLwordcount-0.0.1-SNAPSHOT.jar input.txt 2

Output:

Sysout statement:

[james,31, ronald,28, stella,22, david,38]

We can observe the same output as Hadoop part files in SparkSQL_wordcount.csv folder, since we have called counts.saveAsTextFile(“Spark SQL_wordcount.csv”) in the above code; the output will be written to Spark SQL_wordcount.csv folder.