Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

General MapReduce programs copy HBase data

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Write a MR program to make it suitable for most of the HBase table data to be imported into HBase table data. These include the number of versions that can be set, the column import settings for the input table (select some of these columns), and the column export settings for the output table (select some of these columns).

The original table test1 data is as follows:

There are two versions of data for each row key. Only data with a row key of 1 is shown here.

Create a datasheet in hbase shell:

Create 'test2', {NAME = >' cf1',VERSIONS = > 10} / / Save data with no version, no column import settings, no column export settings create 'test3', {NAME = >' cf1',VERSIONS = > 10} / / Save no version, no column import settings, data with column export settings create 'test4', {NAME = >' cf1',VERSIONS = > 10} / / data create 'test5' with column import settings, no column export settings {NAME = > 'cf1',VERSIONS = > 10} / / data with saved version, no column import settings, no column export settings create' test6', {NAME = > 'cf1',VERSIONS = > 10} / / saved version, no column import settings, data with column export settings create' test7', {NAME = > 'cf1',VERSIONS = > 10} / / data create' test8' with saved version, column import settings, column export settings {NAME = > 'cf1',VERSIONS = > 10} / / data with version, column import settings and column export settings are saved

Main function entry:

Package GeneralHBaseToHBase;import org.apache.hadoop.util.ToolRunner Public class DriverTest {public static void main (String [] args) throws Exception {/ / No version settings, no column import settings, no column export settings String [] myArgs1= new String [] {"test1", / / input table "test2", / / output table "0", / / version size, if the value is 0, the default is to export the latest data from the input table to the output table "- 1", / / column import settings If-1, the column import "- 1" / / column export setting is not set, if it is-1, the column export} is not set. ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs1); / / No version settings, column import settings, no column export settings String [] myArgs2= new String [] {"test1", "test3", "0", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "- 1"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs2) / / No version settings, no column import settings, column export settings String [] myArgs3= new String [] {"test1", "test4", "0", "- 1", "cf1:c1,cf1:c10,cf1:c14"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs3) / have version settings, no column import settings, no column export settings String [] myArgs4= new String [] {"test1", "test5", "2", "- 1", "- 1"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs4) / / have version settings, column import settings, no column export settings String [] myArgs5= new String [] {"test1", "test6", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "- 1"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs5) / / have version settings, no column import settings, column export settings String [] myArgs6= new String [] {"test1", "test7", "2", "- 1", "cf1:c1,cf1:c10,cf1:c14"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs6) / / have version settings, column import settings, column export settings String [] myArgs7= new String [] {"test1", "test8", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "cf1:c1,cf1:c10,cf1:c14"}; ToolRunner.run (HBaseDriver.getConfiguration (), new HBaseDriver (), myArgs7);}}

Driver:

Package GeneralHBaseToHBase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.util.Tool;import util.JarUtil; public class HBaseDriver extends Configured implements Tool {public static String FROMTABLE= "" / / Import table public static String TOTABLE= "; / / Export table public static String SETVERSION="; / / whether or not to set the version / / args = > {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable} @ Override public int run (String [] args) throws Exception {if (args.import args 5) {System.err.println ("Usage:\ n demo.job.HBaseDriver" + "+")

< versions >

"+" like or "+" like or "); return-1;} Configuration conf = getConf (); FROMTABLE = args [0]; TOTABLE = args [1]; SETVERSION = args [2]; conf.set (" SETVERSION ", SETVERSION); if (! args [3] .equals ("-1 ")) {conf.set (" COLUMNFROMTABLE ", args [3]);} if (! args [4] .equals ("-1 ")) {conf.set (" COLUMNTOTABLE ", args [4]) } String jobName = "From table" + FROMTABLE+ ", Import to" + TOTABLE; Job job = Job.getInstance (conf, jobName); job.setJarByClass (HBaseDriver.class); Scan scan = new Scan (); / / determine whether the version if (SETVERSION! = "0" | | SETVERSION! = "1") {scan.setMaxVersions (Integer.parseInt (SETVERSION)) is required. } / / set HBase table input: table name, scan, Mapper class, mapper output key type, mapper output value type TableMapReduceUtil.initTableMapperJob (FROMTABLE, scan, HBaseToHBaseMapper.class, ImmutableBytesWritable.class, Put.class, job); / / set HBase table output: table name, reducer class TableMapReduceUtil.initTableReducerJob (TOTABLE, null, job); / / write directly to output file job.setNumReduceTasks (0) without reducers; return job.waitForCompletion (true)? 0: 1 } private static Configuration configuration; public static Configuration getConfiguration () {if (configuration==null) {/ * TODO learn how to submit code directly from Windows to Hadoop cluster * and modify the configuration to the actual configuration * / configuration= new Configuration (); configuration.setBoolean ("mapreduce.app-submission.cross-platform", true); / / configure to use cross-platform submission task configuration.set ("fs.defaultFS", "hdfs://master:8020") / / specify namenode configuration.set ("mapreduce.framework.name", "yarn"); / / specify the use of yarn framework configuration.set ("yarn.resourcemanager.address", "master:8032"); / / specify resourcemanager configuration.set ("yarn.resourcemanager.scheduler.address", "master:8030"); / / specify resource allocator configuration.set ("mapreduce.jobhistory.address", "master:10020") / / specify historyserver configuration.set ("hbase.master", "master:16000"); configuration.set ("hbase.rootdir", "hdfs://master:8020/hbase"); configuration.set ("hbase.zookeeper.quorum", "slave1,slave2,slave3"); configuration.set ("hbase.zookeeper.property.clientPort", "2181"); / / TODO requires export- > jar file Set the correct jar package location configuration.set ("mapreduce.job.jar", JarUtil.jar (HBaseDriver.class)); / / set the jar package path} return configuration;}}

Mapper:

Package GeneralHBaseToHBase;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Map.Entry;import java.util.NavigableMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper Import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class HBaseToHBaseMapper extends TableMapper {Logger log = LoggerFactory.getLogger (HBaseToHBaseMapper.class); private static int versionNum = 0; private static String [] columnFromTable = null; private static String [] columnToTable = null; private static String column1 = null; private static String column2 = null; @ Override protected void setup (Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration (); versionNum = Integer.parseInt (conf.get ("SETVERSION", "0")) Column1 = conf.get ("COLUMNFROMTABLE", null); if (! (column1 = = null)) {columnFromTable = column1.split (",");} column2 = conf.get ("COLUMNTOTABLE", null); if (! (column2 = = null)) {columnToTable = column2.split (",");} @ Override protected void map (ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {context.write (key, resultToPut (key,value)) } / * convert key,value to Put * @ param key * @ param value * @ throws IOException * / private Put resultToPut (ImmutableBytesWritable key, Result value) throws IOException {HashMap fTableMap = new HashMap (); HashMap tTableMap = new HashMap (); Put put = new Put (key.get ()); if (! (columnFromTable = = null | | columnFromTable.length = = 0)) {fTableMap = getFamilyAndColumn (columnFromTable);} if (! (columnToTable = = null | | columnToTable.length = = 0) {tTableMap = getFamilyAndColumn (columnToTable);} if (versionNum==0) {if (fTableMap.size () = = 0) {if (tTableMap.size () = = 0) {for (Cell kv: value.rawCells ()) {put.add (kv); / / No version, no column import, no column export} return put;} else {return getPut (put, value, tTableMap) / / No version, no column import, column export} else {if (tTableMap.size () = = 0) {return getPut (put, value, fTableMap); / / No version, column import, no column export} else {return getPut (put, value, tTableMap); / / No version, column import, column export} else {if (fTableMap.size () = 0) {if (tTableMap.size () = 0) {return getPut1 (put, value) / / have version, no column import, no column export} else {return getPut2 (put,value,tTableMap); / / have version, no column import, column export} else {if (tTableMap.size () = = 0) {return getPut2 (put,value,fTableMap); / / have version, column import, no column export} else {return getPut2 (put,value,tTableMap) / / if there is a version, column import, column export} / * without version settings, for column import or column export * @ param put * @ param tableMap * @ return * @ throws IOException * / private Put getPut (Put put,Result value,HashMap tableMap) throws IOException {for (Cell kv: value.rawCells ()) {byte [] family = kv.getFamily () If (tableMap.containsKey (new String (family) {String columnStr = tableMap.get (new String (family)); ArrayList columnBy = toByte (columnStr); if (columnBy.contains (new String (kv.getQualifier () {put.add (kv); / / No version is set, no column import is set, and there is a setting column export} return put } / * (with version, column import, column export) or (version, column import, column export) * @ param put * @ param value * @ return * / private Put getPut2 (Put put,Result value,HashMap tableMap) {NavigableMap map=value.getMap (); for (byte [] family:map.keySet ()) {if (tableMap.containsKey (new String (family) {String columnStr = tableMap.get (new String (family)) Log.info ("@" + new String (family) + "" + columnStr); ArrayList columnBy = toByte (columnStr); NavigableMap familyMap = map.get (family); / / column cluster as key to obtain column-related data for (byte [] column:familyMap.keySet ()) {/ / Bad log.info ("!" + new String (column) according to column name) If (columnBy.contains (new String (column) {NavigableMap valuesMap = familyMap.get (column); for (Entry s:valuesMap.entrySet ()) {/ / get the data of different versions corresponding to the column. By default, the latest System.out.println ("* *:" + new String (family) + "+ new String (column) +" + s.getKey () + "+ new String (s.getValue () Put.addColumn (family, column, s.getKey (), s.getValue ());} return put;} / * have version, no column import, no column export * @ param put * @ param value * @ return * / private Put getPut1 (Put put,Result value) {NavigableMap map=value.getMap (); for (byte [] family:map.keySet ()) {NavigableMap familyMap = map.get (family) / / column cluster as key to obtain column-related data for (byte [] column:familyMap.keySet ()) {/ / according to column name bad NavigableMap valuesMap = familyMap.get (column); for (Entry s:valuesMap.entrySet ()) {/ / get different versions of data corresponding to the column. Default is the latest put.addColumn (family, column, s.getKey (), s.getValue ());}} return put } / / str = > {"cf1:c1", "cf1:c2", "cf1:c10", "cf1:c11", "cf1:c14"} / * get the map * @ param str = > {"cf1:c1", "cf1:c2", "cf1:c10", "cf1:c11", "cf1:c14"} * @ return map = > {"cf1" = > "C1 cf1" in the form v of column cluster and column names. C14 "} * / private static HashMap getFamilyAndColumn (String [] str) {HashMap map = new HashMap () HashSet set = new HashSet (); for (String s: str) {set.add (s.split (":") [0]);} Object [] ob = set.toArray (); for (int iTuno; I

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report