In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
What this article shares with you is an example analysis of Kafka+Storm+Elasticsearch integration of real-time data. The editor thinks it is very practical, so I hope you can get something after reading this article. Let's take a look at it with the editor.
Due to the requirements of a recent message retrieval system, when the demand is heavy, it may involve the cleaning and landing of the message data. The data source is realized by the web crawler (initially implemented by python scrapy), and the MQ message is sent to the kafka service of the system through python-kafka. After receiving the message, the KafkaSpout based on storm realizes the data processing and uniformly lands on the ES. The detailed process is as follows:
Environmental preparation
Due to the limited environment, the test environment only provides a local environment, that is, all services based on cluster deployment are tested in LOCAL mode. For specific cluster deployment, please refer to other materials. I only do code development here, and the final application will not be affected.
Server: ubuntu server 17.10JVM Environment: jdk_1.8.0_91_64bit Services Governance: zookeeper-3.4.9 Real-time Computing: apache-storm-1.2.2 message queue: kafka_2.11-2.0.0 Index Storage: elasticsearch-5.6.10- copyright notice: this article is the original article of CSDN blogger "tanwei_" In accordance with the CC 4.0by-sa copyright Agreement, please attach the original source link and this statement for reprint. Original link: https://blog.csdn.net/u012935820/article/details/82378609
Application and development
1. The project is built based on maven and depends on the whole convenience. The project structure is shown in the figure:
2. Project POM
4.0.0 com.sdnware.news news-kafka-storm 1.0 UTF-8 UTF-8 1.8 4.12 2.0.0 1.2.2 1.2.2 1.2.2 1.18.2 2.8.5 junit junit ${junit.version} test org.apache.kafka kafka_2.12 ${kafka.version} org.apache.zookeeper zookeeper org.slf4j Slf4j-api log4j log4j org.apache.storm storm-core ${storm.version} Org.projectlombok lombok ${lombok.version} provided com.google.code.gson gson ${gson.version} org.apache.storm storm-kafka-client ${storm-kafka.version} compile Org.apache.storm storm-elasticsearch ${storm-elasticsearch.version} maven-assembly-plugin com.sdnware.news.topo.KafkaTopology Jar-with-dependencies make-assembly package Assembly maven-compiler-plugin 1.8 1.8 UTF-8
3. Development based on storm-kafka
Note: before storm1.x, the maven plug-in development of storm-kafka was officially provided. Although it is available after 1.x, it has expired. It is also very convenient to recommend storm-kafka-client for development.
In developing the storm implementation, we basically develop the business for a topology. In this example, we write KafkaTopology directly:
Package com.sdnware.news.topo;import com.google.gson.Gson;import com.sdnware.news.pojo.UserInfo;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.elasticsearch.bolt.EsIndexBolt;import org.apache.storm.elasticsearch.common.DefaultEsTupleMapper;import org.apache.storm.elasticsearch.common.EsConfig;import org.apache.storm.elasticsearch.common.EsTupleMapper;import org.apache.storm.kafka.spout.KafkaSpout Import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import java.util.Properties;import java.util.UUID;/** * Created by sdnware on 18-8-31. * / public class KafkaTopology {public static void main (String [] args) throws Exception {/ * * here is just a piece of pseudo code based on storm-kafka: BrokerHosts zkHosts = new ZkHosts (ZK_HOSTS); SpoutConfig config = new SpoutConfig (zkHosts, KAFKA_TOPIC, ZK_ROOT + KAFKA_TOPIC, UUID.randomUUID (). ToString ()); config.scheme = new SchemeAsMultiScheme (new StringScheme ()) Config.zkServers = Arrays.asList (ZK_SERVERS.split (",")); config.zkPort = ZK_PORT; config.socketTimeoutMs = socketTimeoutMs; * * / TopologyBuilder topologyBuilder = new TopologyBuilder (); / / define topo constructor Properties properties = new Properties (); properties.setProperty ("group.id", "test-news-topic") / / basic configuration of kafka server / / define a KafkaSpoutConfig KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig.builder ("192.168.100.39 news-topic") .setFirstPollOffsetStrategy (KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setProp (properties) .build (); KafkaSpout kafkaSpout = new KafkaSpout (kafkaSpoutConfig) / / implement topologyBuilder.setSpout ("kafka-spout", kafkaSpout, 1) by KafkaSpout; / / inject Spout topologyBuilder.setBolt ("kafka-bolt", new NewsBlot (), 1) .shuffleGrouping ("kafka-spout"); / / get kafka-spout data EsConfig esConfig = new EsConfig (new String [] {"http://192.168.100.39:9200"});") through storm / / define an ES configuration information EsTupleMapper esTupleMapper = new DefaultEsTupleMapper (); / / define the default mapping of ES EsIndexBolt indexBolt = new EsIndexBolt (esConfig, esTupleMapper); / / define an index Bolt topologyBuilder.setBolt ("es-bolt", indexBolt, 1). ShuffleGrouping ("kafka-bolt") / / inject indexBolt into topology to process kafka-bolt data / / submit to storm cluster Config config = new Config (); config.setMessageTimeoutSecs (90); if (args.length > 0) {/ / Cluster mode config.setDebug (false); StormSubmitter.submitTopology (args [0], config, topologyBuilder.createTopology ()) } else {/ / local test mode, general testing uses this / / config.setDebug (true); config.setNumWorkers (2); LocalCluster cluster = new LocalCluster (); cluster.submitTopology ("local-kafka-storm-topology", config, topologyBuilder.createTopology ()) }} / / Custom processing of a kafka message mapping Bolt static class NewsBlot extends BaseBasicBolt {/ / execute when there is a message, encapsulating the message to be sent, and the format corresponds to declarer.declare (xxx) public void execute (Tuple input, BasicOutputCollector collector) {/ / System.err.println (input.getValues ()) String id = UUID.randomUUID (). ToString (); UserInfo userInfo = new UserInfo (); userInfo.setId (id); userInfo.setUsername ("tanwei"); userInfo.setPassword ("sdnware"); Gson gson = new Gson (); String source = gson.toJson (userInfo) Collector.emit (new Values (source, "idx_sys", "tb_user", id));} / / defines the field mapping for message delivery. Here is the field mapping logic required by EsTupleMapper, which can trace the source code to understand public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare ("source", "index", "type", "id")). }}}
In the above code, some people may wonder why they didn't see the configuration of storm. Can you find storm when the code runs? I was also very confused when I first developed it. After tracking the source code, I found that all storm configurations are based on defaults.yaml in the storm-core package. For specific changes, please refer to the official instructions. I am testing locally here, so it does not affect the test.
In the execute method of NewsBlot, since the message from kafka is received, the default Tuple is a List, which contains the topic, group, offset and message information of kafka. In the formal environment, we need to encapsulate message into a data format that we want to store in ES according to business requirements. Here, I simply simulate a NewsInfo object information storage. Generally, the source of ES is a json format, key represents the fields in ES, and value is the corresponding value.
Postscript
Because it simply writes a demo and introduces its basic implementation, in the whole message system, we need to take into account the data-oriented packet consumption and other issues, in short, everything remains the same, look more at the source code, suddenly enlightened.
The above is an example analysis of Kafka+Storm+Elasticsearch 's integration of real-time data. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.