Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Sample analysis of flume+kafka+storm operation

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces the example analysis of flume+kafka+storm operation, which is very detailed and has certain reference value. Friends who are interested must finish it!

Overview

In many application scenarios based on Hadoop platform, we need offline and real-time analysis of data. Offline analysis can easily achieve statistical analysis with the help of Hive or mr, but it is not suitable for real-time requirements such as Hive and mr. Real-time application scenarios can use Storm, which is a real-time processing system, which provides a computing model for real-time processing applications, which can be easily processed by programming. In order to unify offline and real-time computing, in general, we want to unify the collection of offline and real-time computing data sources as input, and then analyze and process the data flow through real-time systems and offline analysis systems respectively. At this time, we can consider connecting the data sources (such as collecting logs using Flume) directly to a message middleware, such as Kafka, and integrating Flume+Kafka,Flume as the Producer of the message. The produced message data (log data, business request data, etc.) is published to Kafka, and then through subscription, the Topology of Storm is used as the Consumer of the message, and the following two demand scenarios are processed in the Storm cluster:

Directly use Topology of Storm to analyze and process the data in real time.

Integrate Storm+HDFS, write messages to HDFS for offline analysis and processing

When flume+kafka+storm is combined, flume collects data as a data source, kafka acts as a message queue and acts as a buffer, and storm pulls data from kafka for analysis and processing. Anyone who does software development knows the idea of modularization, and there are two reasons for this design:

On the one hand, it can be modularized, and the function division is clearer, from "data acquisition-data access-streaming computing-data output / storage"

1)。 data acquisition

Responsible for real-time data collection from each node, choose flume of cloudera to realize

2)。 Data access

Since the speed of data acquisition is not necessarily synchronized with the speed of data processing, a message middleware is added as a buffer, and the kafka of apache is selected.

3)。 Flow calculation

Real-time analysis of the collected data, choose storm of apache

4)。 Data output

Mysql is tentatively used to persist the results after analysis.

On the other hand, after modularization, if data acquisition and data access continue to run after the Storm is down, the data will not be lost, and the storm can continue to carry out streaming computing after getting up.

Data source flume

The production data of Kafka is provided by Sink of Flume. Here, we need to use Flume cluster to distribute Agent log collection to Kafka through Flume cluster. We choose the right source according to the situation, here I use exec,channel is memory,sink, of course, is kafkasink. The detailed configuration is as follows:

Flume to kafka

The transfer process from flume to kafka is shown below:

The configuration of kafka is the same as what was built before.

Test flume to kafka

After flume and kafka are configured, start the flume cluster first. Here is the background operation:

Flume-ng agent-n agent-c / usr/local/apache-flume-1.6.0-bin/conf-f / usr/local/apache-flume-1.6.0-bin/conf/flume-test.conf-Dflume.root.logger=DEBUG,console &

Then start zookeeper:

. / zkServer.sh start

Then start the kafka cluster, which runs in the background:

. / kafka-server-start.sh.. / config/server.properties &

Then enter data into the monitored file:

Echo 'hello world' > > topic-test.txt

Then create consumers on the kafka cluster to test whether flume is connected to kafka. Of course, you can also use the kafka monitoring tool to check:

We can create the topic in advance, or we can automatically create the topic, set the kafka auto.create.topics.enable property to true, and default to true.

. / kafka-console-consumer.sh-- zookeeper master:2181-- from-beginning-- topic topic1

The output of 'hello world' here indicates that the connection from flume to kafka is successful.

Storm read kafka data Analysis programming

First of all, set up the storm cluster and start nimbus, supervisor and ui

Then topology programming, here is a small example of java programming:

Main class

Package com.kafka_storm;import java.util.HashMap;import java.util.Map;import storm.kafka.BrokerHosts;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.ZkHosts;import storm.kafka.bolt.KafkaBolt;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import backtype.storm.utils.Utils Public class StormKafkaTopo {public static void main (String [] args) throws Exception {/ / configure Zookeeper address BrokerHosts brokerHosts = new ZkHosts ("master:2181") / / configure the Topic for Kafka subscription, as well as the directory and name of the data node in zookeeper / / it should be noted here that spout creates a node on the zookeeper that holds the read offset for each kafka partition based on the last two parameters of config, such as: / zkroot/topo/partition_0. SpoutConfig spoutConfig = new SpoutConfig (brokerHosts, "topic1", "/ zkkafkaspout", "kafkaspout"); / / configure kafka.broker.properties in KafkaBolt (see kafka java programming) Config conf = new Config (); Map map = new HashMap (); / / configure Kafka broker address map.put ("metadata.broker.list", "master:9092") / / serializer.class is the serialization class map.put ("serializer.class", "kafka.serializer.StringEncoder") of the message; conf.put ("kafka.broker.properties", map); / / configure the topic conf.put generated by KafkaBolt ("topic", "topic2") / / by default, binary data with the domain name bytes is transmitted under spout. If necessary, you can modify it by setting schema. SpoutConfig.scheme = new SchemeAsMultiScheme (new MessageScheme ()); TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("spout", new KafkaSpout (spoutConfig)); builder.setBolt ("bolt", new SenqueceBolt ()) .shuffleGrouping ("spout"); builder.setBolt ("kafkabolt", new KafkaBolt ()) .shuffleGrouping ("bolt") If (args! = null & & args.length > 0) {conf.setNumWorkers (3); StormSubmitter.submitTopology (args [0], conf, builder.createTopology ());} else {LocalCluster cluster = new LocalCluster (); cluster.submitTopology ("Topo", conf, builder.createTopology ()); Utils.sleep (100000) Cluster.killTopology ("Topo"); cluster.shutdown ();}}

Message processing

Package com.kafka_storm;import java.io.UnsupportedEncodingException;import java.util.List;import backtype.storm.spout.Scheme;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values / * * when using KafkaSpout, a subset is required to implement the Scheme interface It is mainly responsible for parsing the required data from the message flow * @ author lenovo * * / public class MessageScheme implements Scheme {/ * (non-Javadoc) * @ see backtype.storm.spout.Scheme#deserialize (byte []) * / public List deserialize (byte [] ser) {try {String msg = new String (ser, "UTF-8") Return new Values (msg);} catch (UnsupportedEncodingException e) {} return null;} / * (non-Javadoc) * @ see backtype.storm.spout.Scheme#getOutputFields () * / public Fields getOutputFields () {/ / TODO Auto-generated method stub return new Fields ("msg");}}

Bolt

Package com.kafka_storm;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values Public class SenqueceBolt extends BaseBasicBolt {/ * (non-Javadoc) * @ see backtype.storm.topology.IBasicBolt#execute (backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) * / public void execute (Tuple input, BasicOutputCollector collector) {/ / TODO Auto-generated method stub String word = (String) input.getValue (0); String out = "Ichimm" + word + "!" System.out.println ("out=" + out); collector.emit (new Values (out));} / * (non-Javadoc) * @ see backtype.storm.topology.IComponent#declareOutputFields (backtype.storm.topology.OutputFieldsDeclarer) * / public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("message"));}}

Run on a cluster

We need to put all the introduced third-party packages under the lib package of storm, including kafka and zookeeper, otherwise we will report the error of missing jar package.

Storm jar StormKafkaDemo.jar com.kafka_storm.StormKafkaTopo StormKafkaDemo

Start the overall test:

Input data into the file monitored by flume, check the output in storm's log log, of course, we can also view it in kafka, because I output the result to kafka, topic is topic2.

The results in the log are as follows:

The above is all the contents of the article "sample Analysis run by flume+kafka+storm". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report