Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark reads Hbase table data and implements something like groupByKe

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

I. Overview

The operating environment of the program is very important. This test is based on:

hadoop-2.6.5

spark-1.6.2

hbase-1.2.4

zookeeper-3.4.6

jdk-1.8

Let's cut the crap and go straight to the demand

Andy column=baseINFO:age, value=21

Andy column=baseINFO:gender, value=0

Andy column=baseINFO:telphone_number, value=110110110

Tom column=baseINFO:age, value=18

Tom column=baseINFO:gender, value=1

Tom column=baseINFO:telphone_number, value=120120120

As shown in the table above, group them with spark to achieve this effect:

[Andy,(21,0,110110110)]

[Tom,(18,1,120120120)]

Requirements are relatively simple, mainly familiar with the program running process

II. Specific Code

package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase { private static String appName = "ReadTable"; public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); //we can also run it at local:"local[3]" the number 3 means 3 threads sparkConf.setMaster("spark://master:7077").setAppName(appName); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("baseINFO")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender")); String scanToString = ""; try { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); scanToString = Base64.encodeBytes(proto.toByteArray()); } catch (IOException io) { System.out.println(io); } for (int i = 0; i

< 2; i++) { try { String tableName = "VIPUSER"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, scanToString); //get the Result of query from the Table of Hbase JavaPairRDD hBaseRDD = jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)] JavaPairRDD art_scores = hBaseRDD.mapToPair( new PairFunction() { @Override public Tuple2 call(Tuple2 results) { List list = new ArrayList(); byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number")); byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age")); byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender")); //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on list.add(Integer.parseInt(Bytes.toString(telphone_number))); list.add(Integer.parseInt(Bytes.toString(age))); list.add(Integer.parseInt(Bytes.toString(gender))); return new Tuple2(Bytes.toString(results._1().get()), list); } } ); //switch to Cartesian product JavaPairRDD cart = art_scores.cartesian(art_scores); //use Row Key to delete the repetition from the last step "Cartesian product" JavaPairRDD cart2 = cart.filter( new Function() { public Boolean call(Tuple2 tuple2Tuple2Tuple2) throws Exception { return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0; } } ); System.out.println("Create the List 'collect'..."); //get the result we need List collect = cart2.collect(); System.out.println("Done.."); System.out.println(collect.size() >

i ? collect.get(i):"STOP"); if (collect.size() > i ) break; } catch (Exception e) { System.out.println(e); } } }}

III. Analysis of program operation process

1, spark self-test and Driver and excitor startup process

Instantiate a SparkContext(if it is under spark2.x, a SparkSession object is initialized here). At this time, start the SecurityManager thread to check user permissions. After OK, create the sparkDriver thread. The spark-based remote communication module (implemented by akka framework) starts and listens to the sparkDriver. After that, the sparkEnv object registers the BlockManagerMaster thread, and its implementation class object monitors the running resources.

2. Self-test and startup of zookeeper and Hbase

After the first step is successfully completed, the sparkContext object goes to the instance to start the program to access the entry of Hbase. After the trigger, zookeeper completes a series of self-checking activities, including user permissions, operating system, data directory, etc. After everything is OK, the client connection object is initialized, and then the ClientCnxn object of Hbase establishes a complete connection with the master.

3. Operation of spark job

The program starts to call the action class method of spark. For example, the call to collect will trigger the execution of the job. The online information of this process is very detailed. It is nothing more than a lot of things done by DAGScheduler, accompanied by a lot of threads, such as TaskSetManager, TaskScheduler, etc. Finally, the job is completed and the result set is returned.

4. End of procedure

After returning the result set correctly, sparkContext uses reflection to call the stop() method, which will also trigger a series of stop operations. The main threads include these: BlockManager,ShutdownHookManager, followed by the release of actor operations, etc. Finally, everything is over. Temporary data and directories will be deleted and resources will be released.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report