Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How Storm receives data

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how Storm receives data". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how Storm receives data".

A brief simulation of how to receive data:

Package com.cc.storm.spout;import java.io.IOException;import java.util.Map;import java.util.Random;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils Public class RandomEmitSpout extends BaseRichSpout {private Random _ random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger (RandomEmitSpout.class); private SpoutOutputCollector _ collector; @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {_ collector = collector; _ random = new Random () } @ Override public void nextTuple () {try {Thread.sleep (1000);} catch (Exception e) {e.printStackTrace ();} String [] userIds = {"1", "2", "3", "4"} String [] merchandiseIDS = {"1"}; _ collector.emit (new Values (userIds [_ random.nextInt (userIds.length)], merchandiseIDS [_ random.nextInt (merchandiseIDS.length)])) } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {/ / TODO Auto-generated method stub declarer.declare (new Fields ("userIdS", "merchandiseIDS");} @ Override public void close () {}}

Plus: if you are using Redis

So:

Package com.cc.storm.spout;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values Import backtype.storm.utils.Utils;public class RedisPubSubSpout extends BaseRichSpout {/ * * @ Fields serialVersionUID: TODO * / private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger (RedisPubSubSpout.class); private SpoutOutputCollector _ collector; private final String host; private final int port; private final String pattern; LinkedBlockingQueue queue; JedisPool pool Public RedisPubSubSpout (String host, int port, String pattern) {/ / TODO Auto-generated constructor stub this.host = host; this.port = port; this.pattern = pattern;} / / listens to the thread to get data class ListenerThread extends Thread {private LinkedBlockingQueue queue from interest events subscribed to by redis JedisPool pool; String pattern; public ListenerThread (LinkedBlockingQueue queue, JedisPool pool, String pattern) {/ / TODO Auto-generated constructor stub this.queue = queue; this.pool = pool This.pattern = pattern } @ Override public void run () {JedisPubSub listener = new JedisPubSub () {@ Override public void onUnsubscribe (String arg0) Int arg1) {/ / TODO Auto-generated method stub} @ Override public void onSubscribe (String arg0 Int arg1) {/ / TODO Auto-generated method stub} @ Override public void onPUnsubscribe (String arg0 Int arg1) {/ / TODO Auto-generated method stub} @ Override public void onPSubscribe (String arg0 Int arg1) {/ / TODO Auto-generated method stub} @ Override public void onPMessage (String pattern, String channel) String message) {/ / TODO Auto-generated method stub queue.offer (message) } @ Override public void onMessage (String channel, String message) {/ / TODO Auto-generated method stub queue.offer (message) }}; Jedis jedis = pool.getResource (); try {jedis.psubscribe (listener, pattern);} finally {pool.returnResource (jedis) } @ SuppressWarnings ("rawtypes") @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {/ / TODO Auto-generated method stub _ collector = collector / / the queue supports a maximum of 1000 queue = new LinkedBlockingQueue (1000); JedisPoolConfig config = new JedisPoolConfig (); / / error pool = null; ListenerThread listener = new ListenerThread (queue, pool, pattern); / / start thread listener.start () } @ Override public void nextTuple () {/ / TODO Auto-generated method stub String ret = queue.poll (); if (null = = ret) {/ / if there is no data in the queue, rest 500ms Utils.sleep } else {/ / data format is "userID:merchandiseID", which can be changed according to demand where String [] s = ret.split (":"); _ collector.emit (new Values (s [0], s [1])) } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {/ / TODO Auto-generated method stub declarer.declare (new Fields ("userIdS", "merchandiseIDS"));} @ Override public void close () {/ / TODO Auto-generated method stub pool.destroy () }} Thank you for your reading. the above is the content of "how Storm receives data". After the study of this article, I believe you have a deeper understanding of how Storm receives data, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report