Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to customize Event Serializer serialization classes by Flume

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "Flume how to customize the Event Serializer serialization class". In the daily operation, I believe that many people have doubts about how to customize the Event Serializer serialization class in Flume. The editor consulted all kinds of information and sorted out a simple and easy-to-use method of operation. I hope it will be helpful to answer the doubts of "Flume how to customize the Event Serializer serialization class". Next, please follow the editor to study!

Type the log from flume to hbase, but our logs are all in Json format because they are saved in MongoDb, so it is no good to use the SimpleHbaseEventSerializer and RegexHbaseEventSerializer that comes with flume, so you start to look at the source code painfully and write your own serialized classes (note here that if you write the code under the hbasesink package of flume, the License information must be added. It is relatively simple, after writing the class, compile and package it, transfer it to the lib directory of flume, and then specify the Serializer class as the written class when configuring agent. Here is the code (class comments are not posted, excuse me):

Public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte [] table;//hbase table private byte [] cf;// column cluster private byte [] [] payload;// column collection private byte [] [] payloadColumn;// column value private byte [] incrementColumn; private String rowSuffix;//roykey suffix private String rowPrefix;//rowkey prefix private byte [] incrementRow; private KeyType keyType / / rowkey suffix type private static final Logger logger = LoggerFactory.getLogger (PRTMSAsyncHbaseEventSerializer.class); @ Override public void configure (Context context) {/ / TODO Auto-generated method stub / / sets the primary key suffix type, here using the timestamp keyType = KeyType.TS If (iCol! = null & &! iCol.isEmpty ()) {incrementColumn = iCol.getBytes (Charsets.UTF_8);} incrementRow = context.getString ("incrementRow", "incRow") .getBytes (Charsets.UTF_8) } @ Override public void configure (ComponentConfiguration conf) {/ / TODO Auto-generated method stub} @ Override public void initialize (byte [] table, byte [] cf) {/ / TODO Auto-generated method stub this.table = table; this.cf = cf } / * * @ Title: setEvent * @ Description: get log information And parse out the HBase column and the value value of the column * @ param event * @ throws * @ see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent (org.apache.flume.Event) * / @ Override public void setEvent (Event event) {/ / TODO Auto-generated method stub / / to get the log letter String log = new String (event.getBody ()) StandardCharsets.UTF_8) / / headers contains the item number and host information in the log Map headers = event.getHeaders (); JsonReader jsonReader = new JsonReader (new StringReader (log)); String name = "; String value ="; String path ="; Map kv = new HashMap () Try {/ / the key-value pairs in the parsing log are cached in map jsonReader.beginObject (); while (jsonReader.hasNext ()) {name = jsonReader.nextName (); value = jsonReader.nextString () If (name.equals ("uri")) path = value.split ("") [1]; kv.put (name, value);} jsonReader.endObject () } catch (IOException e) {/ / TODO Auto-generated catch block e.printStackTrace () } / / parse project id and service host in headers, path if (path.contains ("?")) {path = path.substring (0, path.indexOf ("?"));} String pcode = headers.get ("pcode") String host = headers.get ("host"); / / add project number and server host to map kv.put ("pcode", pcode); kv.put ("host", host); / / initialization column and value array this.payloadColumn = new byte [kv.keySet (). Size ()] [] This.payload = new byte [kv.keySet (). Size ()] []; int I = 0; / / assign for (String key: kv.keySet ()) {this.payloadColumn [I] = key.getBytes () to the column and value of hbase This.payload [I] = kv.get (key). GetBytes (); iTunes;} / sets the prefix format of rowkey as project number + path this.rowSuffix = new StringBuilder (pcode). Reverse (). ToString () + ":" + path+ ":" + kv.get ("time") } @ Override public List getActions () {/ / TODO Auto-generated method stub List actions = new ArrayList (); if (payloadColumn! = null) {byte [] rowKey; try {rowKey = rowSuffix.getBytes () / / for loop that submits all columns and put requests for data. For (int I = 0; I < this.payload.length; iTunes +) {PutRequest putRequest = new PutRequest (table, rowKey, cf, payloadColumn [I], payload [I]); actions.add (putRequest) }} catch (Exception e) {throw new FlumeException ("Could not get row key!", e);}} return actions } @ Override public List getIncrements () {/ / TODO Auto-generated method stub List actions = new ArrayList (); if (incrementColumn! = null) {AtomicIncrementRequest inc = new AtomicIncrementRequest (table, incrementRow, cf, incrementColumn); actions.add (inc) } return actions;} @ Override public void cleanUp () {/ / TODO Auto-generated method stub}} at this point, the study on "how to customize Event Serializer serialization classes in Flume" is over. I hope to be able to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report