In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Storm is written by grovvy.
Kafka is written by scala.
Storm-kafka storm connects plug-ins to kafka consumer
Download address:
Https://github.com/wurstmeister/storm-kafka-0.8-plus
Google-collections-1.0.jar is required in addition to storm and kafka-related jar packages
And zookeeper related packages curator-framework-1.3.3.jar and curator-client-1.3.3.jar
The development used to be organized by com.netflix.curator is now under org.apache.curator
1.Kafka Consumer is Storm Spout code
Package demo;import java.util.ArrayList;import java.util.List;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.StringScheme;import storm.kafka.ZkHosts;public class MyKafkaSpout {public static void main (String [] args) {String topic = "track" ZkHosts zkhosts = new ZkHosts ("192.168.1.107ZkHosts zkhosts 2181192.168.1.108ZkHosts zkhosts 2181"); SpoutConfig spoutConfig = new SpoutConfig (zkhosts, topic, "/ MyKafka", / / offset offset root directory "MyTrack"); / / subdirectory corresponds to an application List zkServers=new ArrayList (); / / zkServers.add ("192.168.1.107") / / zkServers.add ("192.168.1.108"); for (String host:zkhosts.brokerZkStr.split (",")) {zkServers.add (host.split (":) [0]);} spoutConfig.zkServers=zkServers; spoutConfig.zkPort=2181; spoutConfig.forceFromStart=true;// consumption from scratch, which is actually to be changed to false's spoutConfig.socketTimeoutMs=60; spoutConfig.scheme=new SchemeAsMultiScheme (new StringScheme ()) / / output to string type TopologyBuilder builder=new TopologyBuilder (); builder.setSpout ("spout", new KafkaSpout (spoutConfig), 1); / / reference spout, with concurrency set to 1 builder.setBolt ("bolt1", new MyKafkaBolt (), 1). ShuffleGrouping ("spout"); Config config = new Config (); config.setDebug (true) / / change it to false before launching, otherwise there will be a lot of if (args.length > 0) {try {StormSubmitter.submitTopology (args [0], config, builder.createTopology ());} catch (AlreadyAliveException e) {/ / TODO Auto-generated catch block e.printStackTrace () } catch (InvalidTopologyException e) {/ / TODO Auto-generated catch block e.printStackTrace ();}} else {LocalCluster localCluster=new LocalCluster (); localCluster.submitTopology ("mytopology", config, builder.createTopology ()); / / Local mode simulates all the functions of a storm cluster in a process}
The 2.Bolt code is just a simple printout, overriding the execute method
Package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class MyKafkaBolt implements IBasicBolt {@ Override public void declareOutputFields (OutputFieldsDeclarer arg0) {/ / TODO Auto-generated method stub} @ Override public Map getComponentConfiguration () {/ / TODO Auto-generated method stub return null @ Override public void cleanup () {/ / TODO Auto-generated method stub} @ Override public void execute (Tuple input, BasicOutputCollector arg1) {String kafkaMsg = input.getString (0); System.err.println ("bolt" + kafkaMsg);} @ Override public void prepare (Map arg0, TopologyContext arg1) {/ / TODO Auto-generated method stub}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.