Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Storm-kafka (storm spout as the consumer side of kafka)

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Storm is written by grovvy.

Kafka is written by scala.

Storm-kafka storm connects plug-ins to kafka consumer

Download address:

Https://github.com/wurstmeister/storm-kafka-0.8-plus

Google-collections-1.0.jar is required in addition to storm and kafka-related jar packages

And zookeeper related packages curator-framework-1.3.3.jar and curator-client-1.3.3.jar

The development used to be organized by com.netflix.curator is now under org.apache.curator

1.Kafka Consumer is Storm Spout code

Package demo;import java.util.ArrayList;import java.util.List;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.StringScheme;import storm.kafka.ZkHosts;public class MyKafkaSpout {public static void main (String [] args) {String topic = "track" ZkHosts zkhosts = new ZkHosts ("192.168.1.107ZkHosts zkhosts 2181192.168.1.108ZkHosts zkhosts 2181"); SpoutConfig spoutConfig = new SpoutConfig (zkhosts, topic, "/ MyKafka", / / offset offset root directory "MyTrack"); / / subdirectory corresponds to an application List zkServers=new ArrayList (); / / zkServers.add ("192.168.1.107") / / zkServers.add ("192.168.1.108"); for (String host:zkhosts.brokerZkStr.split (",")) {zkServers.add (host.split (":) [0]);} spoutConfig.zkServers=zkServers; spoutConfig.zkPort=2181; spoutConfig.forceFromStart=true;// consumption from scratch, which is actually to be changed to false's spoutConfig.socketTimeoutMs=60; spoutConfig.scheme=new SchemeAsMultiScheme (new StringScheme ()) / / output to string type TopologyBuilder builder=new TopologyBuilder (); builder.setSpout ("spout", new KafkaSpout (spoutConfig), 1); / / reference spout, with concurrency set to 1 builder.setBolt ("bolt1", new MyKafkaBolt (), 1). ShuffleGrouping ("spout"); Config config = new Config (); config.setDebug (true) / / change it to false before launching, otherwise there will be a lot of if (args.length > 0) {try {StormSubmitter.submitTopology (args [0], config, builder.createTopology ());} catch (AlreadyAliveException e) {/ / TODO Auto-generated catch block e.printStackTrace () } catch (InvalidTopologyException e) {/ / TODO Auto-generated catch block e.printStackTrace ();}} else {LocalCluster localCluster=new LocalCluster (); localCluster.submitTopology ("mytopology", config, builder.createTopology ()); / / Local mode simulates all the functions of a storm cluster in a process}

The 2.Bolt code is just a simple printout, overriding the execute method

Package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class MyKafkaBolt implements IBasicBolt {@ Override public void declareOutputFields (OutputFieldsDeclarer arg0) {/ / TODO Auto-generated method stub} @ Override public Map getComponentConfiguration () {/ / TODO Auto-generated method stub return null @ Override public void cleanup () {/ / TODO Auto-generated method stub} @ Override public void execute (Tuple input, BasicOutputCollector arg1) {String kafkaMsg = input.getString (0); System.err.println ("bolt" + kafkaMsg);} @ Override public void prepare (Map arg0, TopologyContext arg1) {/ / TODO Auto-generated method stub}}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report