Integrate Kafka, SparkStreaming and HBase (kafka–>SparkStreaming –> HBase)

By Anusha Jallipalli

Apache Kafka is publish-subscribe messaging system. It is a distributed, partitioned, replicated commit log service.

Spark Streaming is a sub-project of Apache Spark. Spark is a batch processing platform similar to Apache Hadoop. Spark Streaming is a real-time processing tool that runs on top of the Spark engine.

Create a pojo class as below:

StreamingToHbase program receives 4 parameters as input: <zkQuorum> <group> <topics> <numThreads>

zkQuorum:  is a list of one or more zookeeper servers that make quorum

group : is the name of kafka consumer group

topics :  is a list of one or more kafka topics to consume from

numThreads:  is the number of threads the kafka consumer should use 

First, we create a JavaStreamingContext object, which is the main entry point for all streaming functionality. We create a local StreamingContext with five execution threads, and a batch interval of 5 seconds.

SparkConf sparkConf = new SparkConf().setAppName(“spark streaming to HBase”).setMaster(“local[2]”);

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));

final JavaSparkContext context = javaStreamingContext.sparkContext();

When you create a HBaseConfiguration, it reads in whatever you’ve set into your hbase-site.xml and in hbase-default.xml, as long as these can be found on the CLASSPATH.

Configuration config = HBaseConfiguration.create();

JavaHBaseContext hBaseContext = new JavaHBaseContext(context, config);

If you receive data from multiple Kafka topics to spark streaming; then divide the topics based on delimiter. Here we use “,” as delimiter

Import KafkaUtils and create an input DStream as follows:

This line DStream represents the stream of data that will be received from the data server. Each record in this stream is a line of text. Here the data format is in the form of json. So convert data from json to gson using fromJson()

Convert each line of input data into an RDD and push these lines of data into HBase. Here we created pushRawDataToHBase() which receives hbaseContext, personRDD as arguments. Create table “person” in HBase with column family “details”.

Now records get inserted into HBase tables. You can view the data by using scan ‘person’ from HBase shell.