In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Storm-kafka-client uses the example analysis, I believe that many inexperienced people do not know what to do about this, so this article summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
Package hgs.core.sk;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.spout.ByTopicRecordTranslator;import org.apache.storm.kafka.spout.KafkaSpout;import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext Import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values / / refer to the following / / https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52public class StormKafkaMainTest {public static void main (String [] args) {TopologyBuilder builder = new TopologyBuilder () / / this class converts the incoming kafka record to storm's tuple ByTopicRecordTranslator brt = new ByTopicRecordTranslator ((r)-> new Values (r.value (), r.topic ()), new Fields ("values", "test7")) / / set the topic to be consumed, that is, test7 brt.forTopic ("test7", (r)-> new Values (r.value (), r.topic ()), new Fields ("values", "test7")) / / similar to the previous SpoutConfig KafkaSpoutConfig ksc = KafkaSpoutConfig / / bootstrapServers and topic (test7) .builder ("bigdata01:9092,bigdata02:9092,bigdata03:9092" "test7") / / set group.id .setProp (ConsumerConfig.GROUP_ID_CONFIG) "skc-test") / / sets the momentum position to start consumption. SetFirstPollOffsetStrategy (FirstPollOffsetStrategy.LATEST) / / sets the interval between submitting consumption boundaries. SetOffsetCommitPerformsMs (10000) / / Translator .setRecordTranslator (brt) .build () Builder.setSpout ("kafkaspout", new KafkaSpout (ksc), 2); builder.setBolt ("mybolt1", new MyboltO (), 4). ShuffleGrouping ("kafkaspout"); Config config = new Config (); config.setNumWorkers (2); config.setNumAckers (0) Try {StormSubmitter.submitTopology ("storm-kafka-clients", config, builder.createTopology ());} catch (Exception e) {e.printStackTrace ();} / * LocalCluster cu = new LocalCluster (); cu.submitTopology ("test", config, builder.createTopology ()) * /}} class MyboltO extends BaseRichBolt {private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector } public void execute (Tuple input) {/ / make the message larger than one here, and you can find the printed content String out = input.getString (0); System.out.println (out); / / collector.ack (input) in the log under the corresponding woker. } public void declareOutputFields (OutputFieldsDeclarer declarer) {}}
Pom.xml file
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka-client 1.1.3 org.apache .storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 Org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0. 0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest Jar-with-dependencies Make-assembly package single org.apache.maven.plugins Maven-compiler-plugin 1.81.8 / the following is the lambda expression Because it is too big to use on it, so record it here so that you don't understand import java.util.UUID later. Import org.junit.jupiter.api.Test;public class TEst {@ Test public void sysConfig () {String [] ags = {"his is my first storm program so i hope it will success", "i love bascketball", "the day of my birthday i was alone"}; String uuid = UUID.randomUUID () .toString () String nexttuple= ags [new Random (). NextInt (ags.length)]; System.out.println (nexttuple);} @ Test public void lambdaTest () {int b = 100; / / the output returns a value of 10 millia, and / / "(a)-> 10 Secreta" is equivalent to new testinter () PrintPerson ((a)-> 10rooma);} void printPerson (testinter t) {/ / passing t requires a parameter a, that is, the method sysoutitems (int a) System.out.println (t.sysoutitems (100)) defined in the following excuse;} } / / define the interface, which must be used as an excuse in the application of lambda expressions, and there can only be one method interface testinter {T sysoutitems (int a); / / void aAndb (int a, int b);} after reading the above, have you mastered the method of example analysis used by storm-kafka-client? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.