Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the Statistical system of Storm Stream

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to realize the statistical system of Storm stream". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next let the editor to take you to learn "how to achieve the statistical system of Storm streaming"!

1: initial hardware preparation:

1 if the conditions are met: please make sure that you have installed the redis cluster

2 configure your Storm development environment

3 ensure the smooth flow of your development environment: between host and host, between Storm and redis

2: introduction of business background:

1 here we will simulate a streaming data processing process

2 the source of the data is stored in our redis cluster

3 the format of the transmitted data is: ip,url,client_key

Data transmitter

Package storm.spout;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Values;import backtype.storm.tuple.Fields;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;import org.apache.log4j.Logger / * click Spout reads the data needed from redis * / public class ClickSpout extends BaseRichSpout {private static final long serialVersionUID =-6200450568987812474L; public static Logger LOG = Logger.getLogger (ClickSpout.class); / / for redis, we use jedis client private Jedis jedis; / / host private String host; / / port private int port / / Spout collector private SpoutOutputCollector collector @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {/ / here, the format we launch is / / IP,URL,CLIENT_KEY outputFieldsDeclarer.declare (new Fields (storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)) } @ Override public void open (Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {host = conf.get (Conf.REDIS_HOST_KEY) .toString (); port = Integer.valueOf (conf.get (Conf.REDIS_PORT_KEY) .toString ()); this.collector = spoutOutputCollector; connectToRedis () } private void connectToRedis () {jedis = new Jedis (host, port);} @ Override public void nextTuple () {String content = jedis.rpop ("count") If (content = = null | | "nil" .equals (content)) {try {Thread.sleep (300) } catch (InterruptedException e) {}} else {/ / parses the string from the rpop of the jedis object to the json object JSONObject obj = (JSONObject) JSONValue.parse (content) String ip = obj.get (storm.cookbook.Fields.IP) .toString (); String url = obj.get (storm.cookbook.Fields.URL) .toString (); String clientKey = obj.get (storm.cookbook.Fields.CLIENT_KEY) .toString () System.out.println ("this is a clientKey"); / / List tuple object collector.emit (new Values (ip, url, clientKey));}

In the process, please note:

1 We initialize the connection to host,port,collector and Redis in the OPEN method, call the Connect method and connect to the redis database

2 We take the data in nextTupe, convert it to a JSON object, get the ip,url,clientKey, and wrap them into a

Values object

Let's take a look at the flow chart of the data:

After our data is read from clickSpout, next we will use 2 bolt

1: repeatVisitBolt

2: geographyBolt

Work together to read data from the same data source: clickSpout

3 take a closer look at repeatVisitBolt

Package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;public class RepeatVisitBolt extends BaseRichBolt {private OutputCollector collector; private Jedis jedis; private String host; private int port @ Override public void prepare (Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector; host = conf.get (Conf.REDIS_HOST_KEY) .toString (); port = Integer.valueOf (conf.get (Conf.REDIS_PORT_KEY) .toString ()); connectToRedis () } private void connectToRedis () {jedis = new Jedis (host, port); jedis.connect ();} public boolean isConnected () {if (jedis = = null) return false; return jedis.isConnected () } @ Override public void execute (Tuple tuple) {String ip = tuple.getStringByField (storm.cookbook.Fields.IP); String clientKey = tuple.getStringByField (storm.cookbook.Fields.CLIENT_KEY); String url = tuple.getStringByField (storm.cookbook.Fields.URL); String key = url + ":" + clientKey String value = jedis.get (key); / / redis, if not in redis, a new access record is inserted. If (value = = null) {jedis.set (key, "visited"); collector.emit (new Values (clientKey, url, Boolean.TRUE.toString ());} else {collector.emit (new Values (clientKey, url, Boolean.FALSE.toString () } @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new backtype.storm.tuple.Fields (storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL, storm.cookbook.Fields.UNIQUE));}}

Here, we combine url and clientKey into a format combination of [url:clientKey], and according to this object, look for it in redis. If not, then Set goes to the middle of redis and determines that it is [unique].

4:

Package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class VisitStatsBolt extends BaseRichBolt {private OutputCollector collector; private int total = 0; private int uniqueCount = 0; @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector } @ Override public void execute (Tuple tuple) {/ / here, we are upstream to determine whether the Fields is unique and unique boolean unique = Boolean.parseBoolean (tuple.getStringByField (storm.cookbook.Fields.UNIQUE)); total++; if (unique) uniqueCount++; collector.emit (new Values (total,uniqueCount)) @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new backtype.storm.tuple.Fields (storm.cookbook.Fields.TOTAL_COUNT, storm.cookbook.Fields.TOTAL_UNIQUE));}}

For the first time, uv + +

5 next, take a look at pipeline 2:

Package storm.bolt;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import org.json.simple.JSONObject;import storm.cookbook.IPResolver;import java.util.HashMap;import java.util.List;import java.util.Map / * User: yin shaui Date: 2014-05-21 Time: 8:58 AM To change this template use * File | Settings | File Templates. * / public class GeographyBolt extends BaseRichBolt {/ / ip parser private IPResolver resolver; private OutputCollector collector; public GeographyBolt (IPResolver resolver) {this.resolver = resolver;} @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector } @ Override public void execute (Tuple tuple) {/ / 1 get the ip String ip = tuple.getStringByField (storm.cookbook.Fields.IP) we want to use from the superior's directory; / / convert ip to json JSONObject json = resolver.resolveIP (ip) / / organize city and country into a new meta-ancestor, in this case, our Values object String city = (String) json.get (storm.cookbook.Fields.CITY); String country = (String) json.get (storm.cookbook.Fields.COUNTRY_NAME); collector.emit (new Values (country, city)) } @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {/ / confirm the format of our output meta-ancestor outputFieldsDeclarer.declare (new Fields (storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.CITY));}}

The above Bolt has completed a conversion from Ip to CITY,COUNTRY

Package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;public class GeoStatsBolt extends BaseRichBolt {private class CountryStats {/ / private int countryTotal = 0 Private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats (String countryName) {this.countryName = countryName;} private Map cityStats = new HashMap () / * * @ param cityName * / public void cityFound (String cityName) {countryTotal++ / / already has a value An operation if (cityStats.containsKey (cityName)) {cityStats.get (cityName) .set (COUNT_INDEX) CityStats.get (cityName) .get (COUNT_INDEX) .intValue () + 1) / / if there is no value} else {List list = new LinkedList (); list.add (1); list.add (0) CityStats.put (cityName, list);} double percent = (double) cityStats.get (cityName) .get (COUNT_INDEX) / (double) countryTotal; cityStats.get (cityName) .set (PERCENTAGE_INDEX, (int) percent) } / * * @ return Total number of countries obtained * / public int getCountryTotal () {return countryTotal } / * * @ param cityName gets the total number of cities * @ return * / public int getCityTotal (String cityName) {return cityStats.get (cityName) .get (COUNT_INDEX) .city () based on the passed city name. } public String toString () {return "Total Count for" + countryName + "is" + Integer.toString (countryTotal) + "\ n" + "Cities:" + cityStats.toString () }} private OutputCollector collector; / / CountryStats is an inner class object private Map stats = new HashMap (); @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector } @ Override public void execute (Tuple tuple) {String country = tuple.getStringByField (storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField (storm.cookbook.Fields.CITY) / / if a country does not exist, add a new country, national statistics if (! stats.containsKey (country)) {stats.put (country, new CountryStats (country)) } / / get the new statistics here, cityFound is to get the value of a city stats.get (country) .cityFound (city) Collector.emit (new Values (country, stats.get (country). GetCountryTotal (), city, stats.get (country). GetCityTotal (city) } @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new backtype.storm.tuple.Fields (storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.COUNTRY_TOTAL, storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL)) }}

Statistics on geographical location, accompanied by other usage classes of the program

Package storm.cookbook;/** * / public class Fields {public static final String IP = "ip"; public static final String URL = "url"; public static final String CLIENT_KEY = "clientKey"; public static final String COUNTRY = "country"; public static final String COUNTRY_NAME = "country_name"; public static final String CITY = "city" / unique, unique public static final String UNIQUE = "unique"; / / Town integer public static final String COUNTRY_TOTAL = "countryTotal"; / / City integer public static final String CITY_TOTAL = "cityTotal"; / / Total count public static final String TOTAL_COUNT = "totalCount" / / all unique public static final String TOTAL_UNIQUE = "totalUnique";} package storm.cookbook;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.Serializable;import java.net.MalformedURLException;import java.net.URL;import java.net.URLConnection Public class HttpIPResolver implements IPResolver, Serializable {static String url = "http://api.hostip.info/get_json.php"; @ Override public JSONObject resolveIP (String ip) {URL geoUrl = null; BufferedReader in = null; try {geoUrl = new URL (url +"? ip= "+ ip) URLConnection connection = geoUrl.openConnection (); in = new BufferedReader (new InputStreamReader (connection.getInputStream (); String inputLine; JSONObject json = (JSONObject) JSONValue.parse (in); in.close () Return json;} catch (IOException e) {e.printStackTrace () } finally {/ / whenever in is empty, we do not perform the following close operations, but only close operations if (in! = null) {try {in.close () when in is not empty. } catch (IOException e) {} return null;}} package storm.cookbook;import org.json.simple.JSONObject;/** * Created with IntelliJ IDEA. * User: admin * Date: 2012-12-07 * Time: 5:29 PM * To change this template use File | Settings | File Templates. * / public interface IPResolver {public JSONObject resolveIP (String ip);} at this point, I believe you have a deeper understanding of "how to implement the statistical system of Storm streaming". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report