Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Actual combat: Streaming data into HBase using Flum

2025-03-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Create Idea project for AsyncHbaseEventSerializer

Add dependency to pom.xml

Org.apache.flume.flume-ng-sinks

Flume-ng-hbase-sink

1.6.0

Implements AsyncHbaseEventSerializer according to the business.

Import org.apache.flume.Context

Import org.apache.flume.Event

Import org.apache.flume.conf.ComponentConfiguration

Import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer

Import org.hbase.async.AtomicIncrementRequest

Import org.hbase.async.PutRequest

Import java.util.ArrayList

Import java.util.List

/ * *

* Created by root on 12-5-17.

, /

Public class SplittingSerializer implements AsyncHbaseEventSerializer {

Private byte [] table

Private byte [] colFam

Private Event currentEvent

Private byte [] [] rentRowKey

Private final byte [] eventCountCol = "eventCount" .getBytes ()

ColumnNames

Private final List puts = new ArrayList ()

Private final List incs = new ArrayList ()

Private byte [] cur

Public void initialize (byte [] table, byte [] cf) {

This.table = table

This.colFam = cf

/ / Can not get the columns from context in configure method. Had to hard coded here.

ColumnNames = new byte [3] []

ColumnNames [0] = "name" .getBytes ()

ColumnNames [1] = "id" .getBytes ()

ColumnNames [2] = "phone" .getBytes ()

}

Public void setEvent (Event event) {

/ / Set the event and verify that the rowKey is not present

This.currentEvent = event

/ *

/ / Don't know how to set the key of event header.

String rowKeyStr = currentEvent.getHeaders () .get ("rowKey")

If (rowKeyStr = = null) {

Throw new FlumeException ("No row key found in headers!")

}

CurrentRowKey = rowKeyStr.getBytes (); * /

}

Public List getActions () {

/ / Split the event body and get the values for the columns

String eventStr = new String (currentEvent.getBody ())

String [] cols = eventStr.split (",")

Long currTime = System.currentTimeMillis ()

Long revTs = Long.MAX_VALUE-currTime

CurrentRowKey = (Long.toString (revTs) + cols [0]) .getBytes ()

Puts.clear ()

For (int I = 0; I

< cols.length; i++) { //Generate a PutRequest for each column. PutRequest req = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes()); puts.add(req); } return puts; } public List getIncrements() { incs.clear(); //Increment the number of events received incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol)); return incs; } public void cleanUp() { table = null; colFam = null; currentEvent = null; columnNames = null; currentRowKey = null; } public void configure(Context context) { //Get the column names from the configuration //Did not work. Don't know how to use it. String cols = new String(context.getString("columns")); String[] names = cols.split(","); byte[][] columnNames = new byte[names.length][]; int i = 0; System.out.println("getting columnNames"); for(String name : names) { columnNames[i++] = name.getBytes(); } } public void configure(ComponentConfiguration componentConfiguration) { } } build and deploy the jar file build -->

Build artifacts

Copy to the lib directory of flume. Here I use scp to upload to the flume of another host.

2. Configure flume

A1.sources = R1

A1.channels = C1 c2

A1.sinks = K1 sink2

A1.source.s1.selector.type = replicating

# NetCat TCP source

A1.sources.r1.type = netcat

A1.sources.r1.bind = 0.0.0.0

A1.sources.r1.port = 6666

A1.sources.r1.channels = C1 c2

# channel

A1.channels.c2.type = memory

A1.channels.c2.capacity = 10000

A1.channels.c2.transactionCapacity = 1000

# HBase sink

A1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink

A1.sinks.sink2.channel = c2

A1.sinks.sink2.table = law

A1.sinks.sink2.columnFamily = lawfile

A1.sinks.sink2.batchSize = 5000

# The serializer to use

A1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer

# List of columns each event writes to.

A1.sinks.sink2.serializer.columns = name,id,phone

3. Create hbase table

# hbase shell

Create "law"lawfile"

4. Run flume agent

[root@ifrebigsearch2 apache-flume-1.6.0-bin] # bin/flume-ng agent-conf conf--conf-file conf/crawler-hdfs-conf.properties-name A1-Dflume.root.logger=INFO,console

5. Run nc

[root@ifrebigsearch0 dkh] # nc ifrebigsearch2 6666

Zhangsan,10110198806054561,13812345678

OK

6.result

Hbase (main): 002 scan 0 > law'

ROW COLUMN+CELL

9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198

Hangsan 806054561

9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs

Hangsan an

9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812

Hangsan 345678

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report