Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to configure storm to use

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly shows you "how to configure the use of storm", the content is easy to understand, clear, hope to help you solve your doubts, the following let the editor lead you to study and learn "how to configure the use of storm" this article.

The sample code is as follows:

# storm.yaml configuration # zookeeper storm.zookeeper.servers:-"bigdata01"-"bigdata02"-"bigdata03" # path to local data storage storm.local.dir: "/ apps/storm" # nimbus masternimbus.seeds: ["bigdata00"] # workder port supervisor.slots.ports:-6700-6701-6702-6703 launch command bin/ nohup storm nimbus & Bin/ nohup storm supervisor & bin/ nohup storm ui &-package com.hgs.storm Import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer Import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;public class StormWordCountTest {public static void main (String [] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException {TopologyBuilder builder = new TopologyBuilder () Builder.setSpout ("wordspout", new WordCountSpout (), 3); builder.setBolt ("splitword", (IRichBolt) new WordSpliteBolt (), 2). ShuffleGrouping ("wordspout"); / / word is a field issued by splitword, such as line 90 builder.setBolt ("wordcount", new WordCountBolt (), 2). FieldsGrouping ("splitword", new Fields ("word")) Config config = new Config (); config.setNumWorkers (2); / * StormSubmitter.submitTopology ("words-count", config, builder.createTopology ()); if (argsworthy null & & args.length > 0) {StormSubmitter.submitTopology (args [0], config, builder.createTopology ()) } else {LocalCluster cluster = new LocalCluster ();} * / LocalCluster cluster = new LocalCluster (); cluster.submitTopology ("words-count", config, builder.createTopology ());}} class WordCountSpout extends BaseRichSpout {private static final long serialVersionUID = 1L / / from open method to collector, used for declareOutputFields method to issue field information SpoutOutputCollector collector = null; @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector } @ Override public void nextTuple () {collector.emit (new Values ("this is my first storm program so i hope it will success"));} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("message"));}} class WordSpliteBolt extends BaseRichBolt {private static final long serialVersionUID = 1L OutputCollector collector = null; @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;} @ Override public void execute (Tuple input) {String line = input.getString (0); String [] words = line.split ("") For (String wd: words) {collector.emit (new Values (wd, 1));} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("word", "num"));}} class WordCountBolt extends BaseRichBolt {ConcurrentHashMap wordsMap = new ConcurrentHashMap () Private static final long serialVersionUID = 1L; OutputCollector collector = null; @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;} @ Override public void execute (Tuple input) {String word = input.getString (0); Integer num = input.getInteger (1) If (wordsMap.containsKey (word)) {wordsMap.put (word, wordsMap.get (word) + num);} else {wordsMap.put (word, num);} System.out.println (word + "- -" + wordsMap.get (word)) } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}} these are all the contents of the article "how to configure storm to use". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report