Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement the local mode demo of storm

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the relevant knowledge of "how to implement storm's local mode demo". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

SimpleTopology.java

Package com.zgl.helloword;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;/** * defines a simple topology, including a data eruption node spout and a data processing node bolt. * * @ author Administrator * * / public class SimpleTopology {public static void main (String [] args) {try {/ / instantiate the TopologyBuilder class. TopologyBuilder topologyBuilder = new TopologyBuilder (); / / sets the eruption node and allocates the number of concurrency, which controls the number of threads the object has in the cluster. TopologyBuilder.setSpout ("SimpleSpout", new SimpleSpout (), 1); / / sets the data processing node and allocates the number of concurrency. Specifies that the policy for this node to receive an eruption node is random. TopologyBuilder.setBolt ("SimpleBolt", new SimpleBolt (), 3). ShuffleGrouping ("SimpleSpout"); Config config = new Config (); config.setDebug (false); if (args! = null & & args.length > 0) {config.setNumWorkers (1); StormSubmitter.submitTopology (args [0], config, topologyBuilder.createTopology ()) } else {/ / here is the startup code running in local mode. Config.setMaxTaskParallelism (1); LocalCluster cluster = new LocalCluster (); cluster.submitTopology ("simple", config, topologyBuilder.createTopology ());}} catch (Exception e) {e.printStackTrace ();}

SimpleSpout.java

Package com.zgl.helloword;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values Random random=new Random (); / * initialize collector * / @ SuppressWarnings ("rawtypes") public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector } / * is called in the SpoutTracker class, and each call can send a piece of data (a tuple tuple) to the storm cluster. The method will be called * / public void nextTuple () {try {String msg = info [random.nextInt (11)]. / / call launch method collector.emit (new Values (msg)); / / simulate waiting for 100ms Thread.sleep (100);} catch (InterruptedException e) {e.printStackTrace () }} / * defines the field id, which is not useful in simple mode, but is very useful in a mode grouped by field. * this declarer variable is very useful, and we can also call declarer.declareStream (); to define stramId, which can be used to define a more complex flow topology * / public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare ("source")); / / collector.emit (new Values (msg)); parameters correspond to}}

SimpleBolt.java

Package com.zgl.helloword;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * receives the data sent by the eruption node (Spout) and transmits it after simple processing. * * @ author Administrator * * / @ SuppressWarnings ("serial") public class SimpleBolt extends BaseBasicBolt {public void execute (Tuple input, BasicOutputCollector collector) {try {String msg= input.getString (0); if (msg! = null) {System.out.println ("msg=" + msg); collector.emit (new Values (msg + "msg is processed!")) } catch (Exception e) {e.printStackTrace ();}} public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("info"));}}

Pom.xml

4.0.0 strom-zgl storm-zgl 0.0.1-SNAPSHOT jar storm-zgl http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-core 0.9.1-incubating This is the end of the content of "how to implement storm's native mode demo". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report