Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The process of building data warehouse when Flume is connected to Hive

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "the construction process of Flume access Hive data warehouse". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the construction process of Flume access Hive data warehouse".

Real-time streaming into data warehouses is basically available in large companies. Taildir source is supported after Flume1.8. It has the following features and is widely used:

Use regular expressions to match file names in a directory

In the monitored file, once the data is written, Flume will write the information to the specified Sink

High reliability and no data loss

There will be no processing on the trace file, no renaming or deletion

Windows is not supported and binaries cannot be read. Support reading text files by line

This article takes the open source Flume stream as an example to introduce the connection of the stream to HDFS, and then establish the ods layer appearance on top of it.

1.1 taildir source configuration

A1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = / opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = F1 a1.sources.r1.filegroups.f1 = / opt/hoult/servers/logs/start/.*log

1.2 hdfs sink configuration

A1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = / user/data/logs/start/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # configuration file scrolling mode (file size 32m) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # number of event refreshed to hdfs a1.sinks.k1.hdfs.batchSize = 100 # use local time a1.sinks.k1.hdfs.useLocalTimeStamp = true

1.3 configuration of Agent

A1.sources = R1 a1.sinks = K1 a1.channels = C1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = / opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = F1 a1.sources.r1.filegroups.f1 = / user/data/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = Hdfs a1.sinks.k1.hdfs.path = / opt/hoult/servers/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # configuration file scrolling mode (file size 32m) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # number of event refreshed to hdfs a1.sinks.k1.hdfs.batchSize = 1000 # use local time a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink To the channel a1.sources.r1.channels = C1 a1.sinks.k1.channel = C1

/ opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 start

Flume-ng agent-- conf-file / opt/hoult/servers/conf/flume-log2hdfs.conf-name A1-Dflume.roog.logger=INFO,console export JAVA_OPTS= "- Xms4000m-Xmx4000m-Dcom.sun.management.jmxremote" # to make the configuration file effective Also specify the configuration file directory flume-ng agent-- conf/ opt/hoult/servers/flume-1.9.0/conf-- conf-file / opt/hoult/servers/conf/flume-log2hdfs.conf-name A1-Dflume.roog.logger=INFO,console on the command line

Add the following parameters to $FLUME_HOME/conf/flume-env.sh, or an error will be reported as follows:

1.5 use custom interceptor to solve Flume Agent replace local time with timestamp in log

Use netcat source → logger sink to test

# A1 is the name of agent. The names of source, channel and sink are: R1 C1 K1 a1.sources = R1 a1.channels = C1 a1.sinks = K1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux121 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source, channel, The relationship between sink a1.sources.r1.channels = C1 a1.sinks.k1.channel = C1

The main code of the interceptor is as follows:

Public class CustomerInterceptor implements Interceptor {private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern ("yyyyMMdd"); @ Override public void initialize () {} @ Override public Event intercept (Event event) {/ / get the content of body String eventBody = new String (event.getBody (), Charsets.UTF_8); / / get the content of header Map headerMap = event.getHeaders () Final String [] bodyArr = eventBody.split ("\\ s +"); try {String jsonStr = bodyArr [6]; if (Strings.isNullOrEmpty (jsonStr)) {return null;} / / convert string to json object JSONObject jsonObject = JSON.parseObject (jsonStr); String timestampStr = jsonObject.getString ("time") / / convert timestamp to time and date type (format: yyyyMMdd) long timeStamp = Long.valueOf (timestampStr); String date = formatter.format (LocalDateTime.ofInstant (Instant.ofEpochMilli (timeStamp), ZoneId.systemDefault ()); headerMap.put ("logtime", date); event.setHeaders (headerMap) } catch (Exception e) {headerMap.put ("logtime", "unknown"); event.setHeaders (headerMap);} return event;} @ Override public List intercept (List events) {List out = new ArrayList (); for (Event event: events) {Event outEvent = intercept (event) If (outEvent! = null) {out.add (outEvent);}} return out;} @ Override public void close () {} public static class Builder implements Interceptor.Builder {@ Override public Interceptor build () {return new CustomerInterceptor () } @ Override public void configure (Context context) {}}

Start

Flume-ng agent-- conf/ opt/hoult/servers/flume-1.9.0/conf-- conf-file / opt/hoult/servers/conf/flume-test.conf-name A1-Dflume.roog.logger=INFO,console # # Test telnet linux121 9999 Thank you for your reading. This is the content of "the process of building a data warehouse for connecting Flume to Hive". After the study of this article, I believe you have a deeper understanding of the process of building a data warehouse for Flume access to Hive. The specific use situation still needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report