In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
这篇文章主要介绍"Storm分布式RPC怎么配置",在日常操作中,相信很多人在Storm分布式RPC怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm分布式RPC怎么配置"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容
drpc.servers:
- "192.168.1.118"
之后通过storm drpc启动分布式RPC服务。
之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
}
}
从main函数开始,简单解释一下:
首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。
之后我们加入一个自己的bolt,并行数量为3
之后用StormSubmitter把这个topology提交上去就行了。
代码完成之后,打一个jar包,用storm jar把topology提交到集群上。
客户端调用,非常简单
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china");
System.out.println(result);
到此为止,一个最简单的DRPC调用的工作已经完成了。
等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。
源码上有这么一行:
Trident subsumes the functionality provided by this class, so it's deprecated
大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。
trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。
那么上第二份代码:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")).
groupBy(new Fields("word")).
aggregate(new One(), new Fields("one")).
aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
return topology.build();
}
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static class One implements CombinerAggregator {
@Override
public Integer init(TridentTuple tuple) {
return 1;
}
@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}
@Override
public Integer zero() {
return 1;
}
}
}
这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分词
groupBy(new Fields("word")). //分组
aggregate(new One(), new Fields("one")). //给每组的数量设定为1
aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum计算总和
这样的方式看起来跟spark当中对RDD的操作是有些像的。
好了,还是打包,提交。
然后是客户端测试:
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
System.out.println(result);
到此,关于"Storm分布式RPC怎么配置"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.