In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the knowledge of "how to use the Storm MongoDB interface". Many people will encounter such a dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
The overall Storn interface is divided into the following class
1:MongoBolt.java
2: MongoSpout.java
3: MongoTailableCursorTopology.java
4: SimpleMongoBolt.java
Look at the code and say:
one
Package storm.mongo;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import com.mongodb.DB;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;import com.mongodb.WriteConcern / * * Note here, there is no batch call, and it is just an abstract class, which encapsulates the Storm interaction of Mongo * * @ author Adrian Petrescu * * / public abstract class MongoBolt extends BaseRichBolt {private OutputCollector collector; / / MOngDB DB object private DB mongoDB / / record our host, port, and MongoDB data DB populist private final String mongoHost; private final int mongoPort; private final String mongoDbName; / * * @ param mongoHost The host on which Mongo is running. * @ param mongoPort The port on which Mongo is running. * @ param mongoDbName The Mongo database containing all collections being * written to. * / protected MongoBolt (String mongoHost, int mongoPort, String mongoDbName) {this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName } @ Override public void prepare (@ SuppressWarnings ("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector The try {/ / prepare method currently has a Mongo connection this.mongoDB = new MongoClient (mongoHost, mongoPort) .getDB (mongoDbName);} catch (Exception e) {throw new RuntimeException (e) during initialization. } @ Override public void execute (Tuple input) {/ / Note We have another judgment here to determine whether if (shouldActOnInput (input)) {String collectionName = getMongoCollectionForInput (input) should be launched at this time. DBObject dbObject = getDBObjectForInput (input); if (dbObject! = null) {try {mongoDB.getCollection (collectionName) .save (dbObject, new WriteConcern (1)); collector.ack (input) } catch (MongoException me) {collector.fail (input);} else {collector.ack (input) }} / * Decide whether or not this input tuple should trigger a Mongo write. * * @ param input the input tuple under consideration * @ return {@ code true} iff this input tuple should trigger a Mongo write * / public abstract boolean shouldActOnInput (Tuple input); / * * Returns the Mongo collection which the input tuple should be written to. * * @ param input the input tuple under consideration * @ return the Mongo collection which the input tuple should be written to * / public abstract String getMongoCollectionForInput (Tuple input); / * * Returns the DBObject to store in Mongo for the specified input tuple. * get an abstract class of DBObject * @ param input the input tuple under consideration * @ return the DBObject to be written to Mongo * / public abstract DBObject getDBObjectForInput (Tuple input); / / notice that this is closed with the end of the calculation. @ Override public void cleanup () {this.mongoDB.getMongo () .close ();}
2:
Package storm.mongo;import java.util.List;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicBoolean;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.utils.Utils;import com.mongodb.BasicDBObject;import com.mongodb.Bytes;import com.mongodb.DB;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.MongoClient Import com.mongodb.MongoException;/*** A Spout which consumes documents from a Mongodb tailable cursor.** Subclasses should simply override two methods:* * {@ link # declareOutputFields (OutputFieldsDeclarer) declareOutputFields} * {@ link # dbObjectToStormTuple (DBObject) dbObjectToStormTuple}, which turns* a Mongo document into a Storm tuple matching the declared output fields.* * *
* WARNING: You can only use tailable cursors on capped collections.* * @ author Dan Beaulieu * * / / here, in the process of abstraction, Spout at the first level is still an abstract class, and MongoSpout is an abstract class of abstract. The subclass can implement specific methods in the process of inheriting these / / classes / / there is also an operation similar to Cursor. Public abstract class MongoSpout extends BaseRichSpout {private SpoutOutputCollector collector; private LinkedBlockingQueue queue; private final AtomicBoolean opened = new AtomicBoolean (false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName Public MongoSpout (String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) {this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query } class TailableCursorThread extends Thread {/ / Inner class TailableCursorThread Thread / / Note that we use LinkedBlockingQueue objects in it. For java collection classes with high concurrency, please refer to the [Java Collection Type blog] post of this ID. LinkedBlockingQueue queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread (LinkedBlockingQueue queue, DB mongoDB, String mongoCollectionName, DBObject query) {this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName This.query = query } public void run () {while (opened.get ()) {try {/ / create the cursor mongoDB.requestStart () Final DBCursor cursor = mongoDB.getCollection (mongoCollectionName). Find (query) .sort (new BasicDBObject ("$natural" 1)) .addOption (Bytes.QUERYOPTION_TAILABLE) .addOption (Bytes.QUERYOPTION_AWAITDATA) Try {while (opened.get () & & cursor.hasNext ()) {final DBObject doc = cursor.next (); if (doc = = null) break Queue.put (doc) }} finally {try {if (cursor! = null) cursor.close () } catch (final Throwable t) {} try {mongoDB.requestDone () } catch (final Throwable t) {}} Utils.sleep (500) } catch (final MongoException.CursorNotFound cnf) {/ / rethrow only if something went wrong while we expect the cursor to be open. If (opened.get ()) {throw cnf;}} catch (InterruptedException e) {break;} } @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector; this.queue = new LinkedBlockingQueue (1000); try {this.mongoDB = new MongoClient (this.mongoHost, this.mongoPort) .getDB (this.mongoDbName) } catch (Exception e) {throw new RuntimeException (e);} TailableCursorThread listener = new TailableCursorThread (this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set (true); listener.start () } @ Override public void close () {this.opened.set (false);} @ Override public void nextTuple () {DBObject dbo = this.queue.poll (); if (dbo = = null) {Utils.sleep (50) } else {this.collector.emit (dbObjectToStormTuple (dbo));} @ Override public void ack (Object msgId) {/ / TODO Auto-generated method stub} @ Override public void fail (Object msgId) {/ / TODO Auto-generated method stub} public abstract List dbObjectToStormTuple (DBObject message) } this is the end of the content of "how to use the Storm MongoDB interface". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.