In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
I. Overview
The operating environment of the program is very important. This test is based on:
hadoop-2.6.5
spark-1.6.2
hbase-1.2.4
zookeeper-3.4.6
jdk-1.8
Let's cut the crap and go straight to the demand
Andy column=baseINFO:age, value=21
Andy column=baseINFO:gender, value=0
Andy column=baseINFO:telphone_number, value=110110110
Tom column=baseINFO:age, value=18
Tom column=baseINFO:gender, value=1
Tom column=baseINFO:telphone_number, value=120120120
As shown in the table above, group them with spark to achieve this effect:
[Andy,(21,0,110110110)]
[Tom,(18,1,120120120)]
Requirements are relatively simple, mainly familiar with the program running process
II. Specific Code
package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase { private static String appName = "ReadTable"; public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); //we can also run it at local:"local[3]" the number 3 means 3 threads sparkConf.setMaster("spark://master:7077").setAppName(appName); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("baseINFO")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age")); scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender")); String scanToString = ""; try { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); scanToString = Base64.encodeBytes(proto.toByteArray()); } catch (IOException io) { System.out.println(io); } for (int i = 0; i
< 2; i++) { try { String tableName = "VIPUSER"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, scanToString); //get the Result of query from the Table of Hbase JavaPairRDD hBaseRDD = jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)] JavaPairRDD art_scores = hBaseRDD.mapToPair( new PairFunction() { @Override public Tuple2 call(Tuple2 results) { List list = new ArrayList(); byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number")); byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age")); byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender")); //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on list.add(Integer.parseInt(Bytes.toString(telphone_number))); list.add(Integer.parseInt(Bytes.toString(age))); list.add(Integer.parseInt(Bytes.toString(gender))); return new Tuple2(Bytes.toString(results._1().get()), list); } } ); //switch to Cartesian product JavaPairRDD cart = art_scores.cartesian(art_scores); //use Row Key to delete the repetition from the last step "Cartesian product" JavaPairRDD cart2 = cart.filter( new Function() { public Boolean call(Tuple2 tuple2Tuple2Tuple2) throws Exception { return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0; } } ); System.out.println("Create the List 'collect'..."); //get the result we need List collect = cart2.collect(); System.out.println("Done.."); System.out.println(collect.size() >i ? collect.get(i):"STOP"); if (collect.size() > i ) break; } catch (Exception e) { System.out.println(e); } } }}
III. Analysis of program operation process
1, spark self-test and Driver and excitor startup process
Instantiate a SparkContext(if it is under spark2.x, a SparkSession object is initialized here). At this time, start the SecurityManager thread to check user permissions. After OK, create the sparkDriver thread. The spark-based remote communication module (implemented by akka framework) starts and listens to the sparkDriver. After that, the sparkEnv object registers the BlockManagerMaster thread, and its implementation class object monitors the running resources.
2. Self-test and startup of zookeeper and Hbase
After the first step is successfully completed, the sparkContext object goes to the instance to start the program to access the entry of Hbase. After the trigger, zookeeper completes a series of self-checking activities, including user permissions, operating system, data directory, etc. After everything is OK, the client connection object is initialized, and then the ClientCnxn object of Hbase establishes a complete connection with the master.
3. Operation of spark job
The program starts to call the action class method of spark. For example, the call to collect will trigger the execution of the job. The online information of this process is very detailed. It is nothing more than a lot of things done by DAGScheduler, accompanied by a lot of threads, such as TaskSetManager, TaskScheduler, etc. Finally, the job is completed and the result set is returned.
4. End of procedure
After returning the result set correctly, sparkContext uses reflection to call the stop() method, which will also trigger a series of stop operations. The main threads include these: BlockManager,ShutdownHookManager, followed by the release of actor operations, etc. Finally, everything is over. Temporary data and directories will be deleted and resources will be released.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.