In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1. Create Idea project for AsyncHbaseEventSerializer
Add dependency to pom.xml
Org.apache.flume.flume-ng-sinks
Flume-ng-hbase-sink
1.6.0
Implements AsyncHbaseEventSerializer according to the business.
Import org.apache.flume.Context
Import org.apache.flume.Event
Import org.apache.flume.conf.ComponentConfiguration
Import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer
Import org.hbase.async.AtomicIncrementRequest
Import org.hbase.async.PutRequest
Import java.util.ArrayList
Import java.util.List
/ * *
* Created by root on 12-5-17.
, /
Public class SplittingSerializer implements AsyncHbaseEventSerializer {
Private byte [] table
Private byte [] colFam
Private Event currentEvent
Private byte [] [] rentRowKey
Private final byte [] eventCountCol = "eventCount" .getBytes ()
ColumnNames
Private final List puts = new ArrayList ()
Private final List incs = new ArrayList ()
Private byte [] cur
Public void initialize (byte [] table, byte [] cf) {
This.table = table
This.colFam = cf
/ / Can not get the columns from context in configure method. Had to hard coded here.
ColumnNames = new byte [3] []
ColumnNames [0] = "name" .getBytes ()
ColumnNames [1] = "id" .getBytes ()
ColumnNames [2] = "phone" .getBytes ()
}
Public void setEvent (Event event) {
/ / Set the event and verify that the rowKey is not present
This.currentEvent = event
/ *
/ / Don't know how to set the key of event header.
String rowKeyStr = currentEvent.getHeaders () .get ("rowKey")
If (rowKeyStr = = null) {
Throw new FlumeException ("No row key found in headers!")
}
CurrentRowKey = rowKeyStr.getBytes (); * /
}
Public List getActions () {
/ / Split the event body and get the values for the columns
String eventStr = new String (currentEvent.getBody ())
String [] cols = eventStr.split (",")
Long currTime = System.currentTimeMillis ()
Long revTs = Long.MAX_VALUE-currTime
CurrentRowKey = (Long.toString (revTs) + cols [0]) .getBytes ()
Puts.clear ()
For (int I = 0; I
< cols.length; i++) { //Generate a PutRequest for each column. PutRequest req = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes()); puts.add(req); } return puts; } public List getIncrements() { incs.clear(); //Increment the number of events received incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol)); return incs; } public void cleanUp() { table = null; colFam = null; currentEvent = null; columnNames = null; currentRowKey = null; } public void configure(Context context) { //Get the column names from the configuration //Did not work. Don't know how to use it. String cols = new String(context.getString("columns")); String[] names = cols.split(","); byte[][] columnNames = new byte[names.length][]; int i = 0; System.out.println("getting columnNames"); for(String name : names) { columnNames[i++] = name.getBytes(); } } public void configure(ComponentConfiguration componentConfiguration) { } } build and deploy the jar file build -->Build artifacts
Copy to the lib directory of flume. Here I use scp to upload to the flume of another host.
2. Configure flume
A1.sources = R1
A1.channels = C1 c2
A1.sinks = K1 sink2
A1.source.s1.selector.type = replicating
# NetCat TCP source
A1.sources.r1.type = netcat
A1.sources.r1.bind = 0.0.0.0
A1.sources.r1.port = 6666
A1.sources.r1.channels = C1 c2
# channel
A1.channels.c2.type = memory
A1.channels.c2.capacity = 10000
A1.channels.c2.transactionCapacity = 1000
# HBase sink
A1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink
A1.sinks.sink2.channel = c2
A1.sinks.sink2.table = law
A1.sinks.sink2.columnFamily = lawfile
A1.sinks.sink2.batchSize = 5000
# The serializer to use
A1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer
# List of columns each event writes to.
A1.sinks.sink2.serializer.columns = name,id,phone
3. Create hbase table
# hbase shell
Create "law"lawfile"
4. Run flume agent
[root@ifrebigsearch2 apache-flume-1.6.0-bin] # bin/flume-ng agent-conf conf--conf-file conf/crawler-hdfs-conf.properties-name A1-Dflume.root.logger=INFO,console
5. Run nc
[root@ifrebigsearch0 dkh] # nc ifrebigsearch2 6666
Zhangsan,10110198806054561,13812345678
OK
6.result
Hbase (main): 002 scan 0 > law'
ROW COLUMN+CELL
9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198
Hangsan 806054561
9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs
Hangsan an
9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812
Hangsan 345678
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.