In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article is about how to integrate storm1.1.3 and kafka1.0.0. I think it is very practical, so I share it with you. I hope you can get something after reading this article. Let's take a look at it.
Package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder Import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings ("deprecation") public class StormKafkaMainTest {public static void main (String [] args) {TopologyBuilder builder = new TopologyBuilder (); / / zookeeper link address BrokerHosts hosts = new ZkHosts ("bigdata01:2181,bigdata02:2181,bigdata03:2181") / / KafkaSpout requires a config. The parameter represents the meaning of the 1:zookeeper link. 2: the topic,3,4 of the consuming kafka: record the zookeeper address of the consuming offset, which will be stored under / test7/consume of the zookeeper / / cluster SpoutConfig sconfig = new SpoutConfig (hosts, "test7", "/ test7", "consume"). / / ignore offset when consuming, which can be commented out here, because the offset of consumption can be found in zookeeper: sconfig.ignoreZkOffsets=true; / / sconfig.scheme = new SchemeAsMultiScheme (new StringScheme ()); builder.setSpout ("kafkaspout", new KafkaSpout (sconfig), 1). Builder.setBolt ("mybolt1", new MyboltO (), 1). ShuffleGrouping ("kafkaspout"); Config config = new Config (); config.setNumWorkers (1); try {StormSubmitter.submitTopology ("storm----kafka--test", config, builder.createTopology ()) } catch (Exception e) {e.printStackTrace ();} / * LocalCluster cu = new LocalCluster (); cu.submitTopology ("test", config, builder.createTopology ()); * /}} class MyboltO extends BaseRichBolt {private static final long serialVersionUID = 1L; OutputCollector collector = null Public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector } public void execute (Tuple input) {/ / make the message larger than one here, and you can find the printed content in the log under the corresponding woker / / because the resulting content is a byte array, you need to convert String out = new String ((byte []) input.getValue (0)); System.out.println (out) Collector.ack (input) } public void declareOutputFields (OutputFieldsDeclarer declarer) {}} pom.xml file dependency 4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test Org.apache.storm storm-kafka 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 Org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1. 7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 Hgs.core.sk.StormKafkaMainTest jar-with-dependencies Make-assembly package single Org.apache.maven.plugins maven-compiler-plugin 1.8 and above is how to integrate storm1.1.3 and kafka1.0.0. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.