Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Storm+kafka integrates simple applications

2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

These two days the company wants to learn kafka, combined with the previous storm, did a simple integration, before also referring to some examples on the Internet, found that there are more or less some problems. So I made one myself.

This is a problem encountered by other people on the Internet. Give an excerpt to prevent yourself and everyone from appearing in the future:

The basic scenario is that when an error occurs in the application, the log is sent to a topic,storm of kafka to subscribe to the topic, and then it is processed later. The scenario is very simple, but in the course of learning, there is a strange exception: when reading topic data using KafkaSpout, offset data is not written to ZK, resulting in reading from scratch every time. After struggling for two days, I finally happened to find out why: you should use BaseBasicBolt as the parent of bolt, not BaseRichBolt.

Basic subscription:

Basic scenario: subscribe to a topic of kafka, then add a custom string before the message is read, and then write back to another topic of kafka. Spout that reads data from Kafka uses storm.kafka.KafkaSpout, and Bolt that writes data to Kafka uses storm.kafka.bolt.KafkaBolt. The Bolt for data processing in the middle is defined as TopicMsgBolt.

Import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.TopologyBuilder;import backtype.storm.utils.Utils;import storm.kafka.BrokerHosts;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.ZkHosts;import storm.kafka.bolt.KafkaBolt;import java.util.Properties Public class TopicMsgTopology {public static void main (String [] args) throws Exception {/ / configure Zookeeper address BrokerHosts brokerHosts = new ZkHosts ("localhost:2181"); / / configure Topic for Kafka subscription and SpoutConfig spoutConfig = new SpoutConfig (brokerHosts, "msgTopic1", "/ topology/root1", "topicMsgTopology") for the data node directory and name in zookeeper / / configure kafka.broker.properties Config conf = new Config () in KafkaBolt; Properties props = new Properties (); / / configure Kafka broker address props.put ("metadata.broker.list", "localhost:9092"); / / serializer.class is the serialization class props.put ("serializer.class", "kafka.serializer.StringEncoder") of the message Conf.put ("kafka.broker.properties", props); / configure topic conf.put generated by KafkaBolt ("topic", "msgTopic2"); spoutConfig.scheme = new SchemeAsMultiScheme (new MessageScheme ()); TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("msgKafkaSpout", new KafkaSpout (spoutConfig)); builder.setBolt ("msgSentenceBolt", (IBasicBolt) new TopicMsgBolt ()) .shuffleGrouping ("msgKafkaSpout") Builder.setBolt ("msgKafkaBolt", new KafkaBolt ()) .shuffleGrouping ("msgSentenceBolt"); if (args.length = = 0) {String topologyName = "kafkaTopicTopology"; LocalCluster cluster = new LocalCluster (); cluster.submitTopology (topologyName, conf, builder.createTopology ()); Utils.sleep (100000); cluster.killTopology (topologyName); cluster.shutdown () } else {conf.setNumWorkers (1); StormSubmitter.submitTopology (args [0], conf, builder.createTopology ();}

The parameters of the storm.kafka.ZkHosts constructor are in the form of a zookeeper standard configuration address

The first parameter of the storm.kafka.SpoutConfig construction method is the above storm.kafka.ZkHosts object, the second is the name of the topic to be subscribed, the third parameter zkRoot is the node (zk node) that writes the offset offset data when the topic is read, and the fourth parameter is the name of the secondary node on that node (there is a place that this is the id of spout). The backtype.storm.Config object is the basic configuration required to configure the topology (topology) of storm. The parameters entered by the constructor of backtype.storm.spout.SchemeAsMultiScheme are the processing parameters of subscription kafka data. Here, the MessageScheme is custom, and the code is as follows:

Import backtype.storm.spout.Scheme;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;import java.util.List;public class MessageScheme implements Scheme {private static final Logger logger = LoggerFactory.getLogger (MessageScheme.class); @ Override public List deserialize (byte [] ser) {try {String msg = new String (ser, "UTF-8") Logger.info ("get one message is {}", msg); return new Values (msg);} catch (UnsupportedEncodingException ignored) {return null;} @ Override public Fields getOutputFields () {return new Fields ("msg");}}

The getOutputFields method in the MessageScheme class is the name of the KafkaSpout sending backward tuple (the minimum structure for storm to transmit data), which needs to be consistent with the Bolt that receives the data (it can be inconsistent in this case, because the zero item of data is taken directly later, but it needs to be unified in the case of wordCount). The TopicMsgBolt class is the Bolt that receives data from storm.kafka.KafkaSpout, processes the received data, and then transmits it backwards to storm.kafka.bolt.KafkaBolt. The code is as follows:

Import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class TopicMsgBolt extends BaseBasicBolt {private static final Logger logger = LoggerFactory.getLogger (TopicMsgBolt.class); @ Override public void execute (Tuple input, BasicOutputCollector collector) {String word = (String) input.getValue (0) String out= "Message got is'" + word + "!"; logger.info ("out= {}", out); collector.emit (new Values (out));} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("message"));}}

It is important to note here that you should use the backtype.storm.topology.base.BaseBasicBolt object as the parent class, otherwise the offset offset data will not be recorded in zk. Now that the code you need to write is complete, the next step is to test it in the built storm and kafka:

# create topic./bin/kafka-topics.sh-- create-- zookeeper localhost:2181-- replication-factor 1-- partitions 1-- topic msgTopic1./bin/kafka-topics.sh-- create-- zookeeper localhost:2181-- replication-factor 1-- partitions 1-- topic msgTopic2

Next, you need to start producer (producer) and consumer (consumer) for msgTopic1 and msgTopic2, respectively.

# launch producer for msgTopic1 to send data. / bin/kafka-console-producer.sh-- broker-list localhost:9092-- topic msgTopic1 # launch consumer for msgTopic2 to view the processing result of the sent data. / bin/kafka-console-consumer.sh-- zookeeper localhost:2181-- topic msgTopic2-- from-beginning

Execute the jar command of storm to run the program:

Storm jar stormkafka.jar stormkafka1.TopicMsgTopology

After the corresponding worker is started, you can input data in the producer corresponding terminal of the msgTopic1, and then view the output result in the consumer corresponding terminal of the msgTopic2.

There are several points to note: first, the bolt defined by msgTopic1 and msgTopic2 topic; must use BaseBasicBolt as the parent class, and BaseRichBolt cannot be used, otherwise the offset cannot be recorded; zookeeper is better to use distributed mode or pseudo-distributed mode with at least three nodes, otherwise some exceptions will occur; under the whole storm, the id of spout and bolt must be unique, otherwise an exception will occur. The TopicMsgBolt class, as the last Bolt before storm.kafka.bolt.KafkaBolt, needs to define the name of the output data as message, otherwise KafkaBolt cannot receive the data.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report