Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Storm Notes arrangement (2): Storm Local Development case-Sum calculation and word Statistics

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

[TOC]

Overview

The LocalCluster object is provided in the API of Strom, so it is very convenient to develop Storm programs without building a Storm environment or a Storm cluster.

Building an engineering project based on Maven requires the following dependencies:

Org.apache.storm storm-core 1.0.2Storm Local Development case 1: summation Computing requirements Analysis

The requirements are as follows:

Data sources continue to generate incremental numbers, summing up the resulting numbers

The analysis is as follows:

Strom's Topology contains two node types: Spout and Bolt. In this case, you can use Spout to process the data source (simulate the generated data), and then send it to the computing and Bolt, so you only need to use one Spout node and one Bolt node. Program development

After understanding the design idea of Storm, compare it with the design idea of MapReduce, and then look at the following program code is actually very easy to understand.

OrderSpout/** * data Source * / static class OrderSpout extends BaseRichSpout {private Map conf; / / current component configuration Information private TopologyContext context; / / current component context object private SpoutOutputCollector collector; / / component @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector The core method for receiving data: * / @ Override public void nextTuple () {long num = 0; while (true) {num++; StormUtil.sleep (1000); System.out.println ("current time" + StormUtil.df_yyyyMMddHHmmss.format (new Date ()) + "order amount generated:" + num) This.collector.emit (new Values (num));}} / * is the description of the sent data schema * / @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("order_cost"));}} SumBoltprivate Long sumOrderCost = 0L / * Bolt node for computing and * / static class SumBolt extends BaseRichBolt {private Map conf; / / current component configuration information private TopologyContext context; / / current component context object private OutputCollector collector; / / component sending tuple @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector } private Long sumOrderCost = 0L; / * Core method of data processing * / @ Override public void execute (Tuple input) {Long orderCost = input.getLongByField ("order_cost"); sumOrderCost + = orderCost; System.out.println ("total merchandise transaction volume from" Mall website to now "+ StormUtil.df_yyyyMMddHHmmss.format (new Date ()) +" + sumOrderCost)) StormUtil.sleep (1000);} / * if the current bolt is the last processing unit, this method can ignore the case of * / @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}} StormLocalSumTopology/** * 1 °to realize the summation of numbers: the data source continues to generate incremental numbers and accumulates the resulting numbers. *

* Storm components: Spout, Bolt, data is Tuple, use Topology in main to associate spout and bolt * components of MapReduce: Mapper and Reducer, data is Writable, associate them through a job in main *

* Adapter pattern (Adapter): BaseRichSpout, which overrides some unnecessary methods in the inheritance interface, but the rewritten code does not implement any functionality. * We call this adapter pattern * / public class StormLocalSumTopology {/ * build topology, which is equivalent to building Job * / public static void main (String [] args) {TopologyBuilder builder = new TopologyBuilder () in MapReduce. / * set dag (directed acyclic graph) of spout and bolt * / builder.setSpout ("id_order_spout", new OrderSpout ()); builder.setBolt ("id_sum_bolt", new SumBolt ()) .shuffleGrouping ("id_order_spout") / / specify the upstream components of the data through different data flow methods / / use builder to build topology StormTopology topology = builder.createTopology (); / / start topology LocalCluster localCluster = new LocalCluster (); / / Local development mode, the object created is LocalCluster String topologyName = StormLocalSumTopology.class.getSimpleName (); / / the name of the topology Config config = new Config () / / the Config () object inherits from HashMap, but encapsulates some basic configuration localCluster.submitTopology (topologyName, config, topology);}}

It is important to note that the classes of Spout and Bolt are used as static member variables of StormLocalSumTopology, which is done for the convenience of development, but can actually be treated as a separate file.

test

Execute the main function, and its output is as follows:

Current time 20180412213836 order amount: 1 the total transaction volume of goods from the mall website to the current 20180412213836 1 the current time 20180412213837 order amount: 2 the total commodity transaction volume from the mall website to the current 20180412213837 3 the order amount generated at the current time 20180412213838: 3 the total commodity transaction volume from the mall website to the current 20180412213838 6.Storm local development case 2: word statistical demand analysis

The requirements are as follows:

Monitor the files in a directory, when you find a new file, read the file, parse the contents of the file, and count the total number of words.

The analysis is as follows:

You can set up three nodes: Spout: for continuously reading files in the directory that need to be monitored (identified by a suffix), and outputting each line to the next Bolt (similar to FileInputFormat in MapReduce) Bolt1: reading lines and parsing the words in them, outputting each word to the next Bolt (similar to Mapper in MapReduce) Bolt2: reading words Do statistical calculations (similar to Reducer in MapReduce) develop FileSpout/** * Spout to get data sources, where files in a directory are continuously read and each line is output to the next Bolt * / static class FileSpout extends BaseRichSpout {private Map conf / / current component configuration information private TopologyContext context; / / current component context object private SpoutOutputCollector collector; / / components that send tuple @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector } @ Override public void nextTuple () {File directory = new File ("D:/data/storm"); / / the second parameter extensions means that only files with certain suffixes are collected: Collection files = FileUtils.listFiles (directory, new String [] {"txt"}, true). For (File file: files) {try {List lines = FileUtils.readLines (file, "utf-8"); for (String line: lines) {this.collector.emit (new Values (line)) } / / after the current file is consumed, it needs to be renamed, and in order to prevent the addition of the same file, the renamed file is added with a random UUID, or File destFile = new File (file.getAbsolutePath () + "_" + UUID.randomUUID (). ToString () + ".timestamp") FileUtils.moveFile (file, destFile);} catch (IOException e) {e.printStackTrace ();}} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("line")) }} SplitBolt/** * Bolt node, which cuts each line of data received into words and sends them to the next node * / static class SplitBolt extends BaseRichBolt {private Map conf; / / current component configuration information private TopologyContext context; / / current component context object private OutputCollector collector / / send tuple components @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector;} @ Override public void execute (Tuple input) {String line = input.getStringByField ("line"); String [] words = line.split ("") For (String word: words) {this.collector.emit (new Values (word,1));}} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("word", "count"));}} WCBolt/** * Bolt node, perform word count calculation * / static class WCBolt extends BaseRichBolt {private Map conf / / current component configuration information private TopologyContext context; / / current component context object private OutputCollector collector; / / components that send tuple @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector;} private Map map = new HashMap () @ Override public void execute (Tuple input) {String word = input.getStringByField ("word"); Integer count = input.getIntegerByField ("count"); / * if (map.containsKey (word)) {map.put (word, map.get (word) + 1);} else {map.put (word, 1) } * / map.put (word, map.getOrDefault (word, 0) + 1); System.out.println ("= ="); map.forEach ((k, v)-> {System.out.println (k + "::" + v);}) } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}} StormLocalWordCountTopology/** * 2 °, word count: monitor the files in a directory. When a new file is found, read the file, parse the contents of the file, count the total number of words appearing E:\ data\ storm * / public class StormLocalWordCountTopology {/ * build the topology, assemble Spout and Bolt nodes It is equivalent to building Job * / public static void main (String [] args) {TopologyBuilder builder = new TopologyBuilder () in MapReduce / / dag builder.setSpout ("id_file_spout", new FileSpout ()); builder.setBolt ("id_split_bolt", new SplitBolt ()) .shuffleGrouping ("id_file_spout"); builder.setBolt ("id_wc_bolt", new WCBolt ()) .shuffleGrouping ("id_split_bolt"); StormTopology stormTopology = builder.createTopology (); LocalCluster cluster = new LocalCluster () String topologyName = StormLocalWordCountTopology.class.getSimpleName (); Config config = new Config (); cluster.submitTopology (topologyName, config, stormTopology);}} Test

After executing the program, add a .txt file to the target directory, and the program output is as follows:

= = hello:::1==hello:::1you:::1==hello:::2you:::1==hello:::2he:::1you:::1==hello:::3he:::1you:::1==me:::1hello:::3he:::1you:::1Storm terminology interpretation

After writing the program of Storm, it is much easier to understand the related terms.

TopologyTopology is used to encapsulate the logic of a real-time computing application, similar to Hadoop's MapReduce JobStream message flow Stream message flow, is an unbounded tuple sequence, these tuples will be created and processed in parallel in a distributed way Spouts message source Spouts message source, is the message producer, he will read data from an external source and send messages to the topology inside: tupleBolts message processor Bolts message processor All message processing logic is encapsulated in bolts to process input data streams and generate new output data streams, executable filtering, aggregation Query database and other operations TaskTask each Spout and Bolt will be executed as many task in the entire cluster, each task corresponds to a thread .Stream groupings message distribution policy Stream groupings message distribution policy, one of the steps of defining a Topology is to define what streams each tuple accepts as input. Stream grouping is used to define how a stream should be allocated to Bolts.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report