Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Thrift in Storm

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

In this issue, the editor will bring you about how to use Thrift in Storm. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

1 IDL

The first is storm.thrift, which defines the data structure and service used in IDL.

Then backtype.storm.generated stores the Java code that is automatically converted from IDL through Thrift

For example, for nimbus service

IDL is defined as

Service Nimbus {

Void submitTopology (1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite)

Void submitTopologyWithOpts (1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite)

Void killTopology (1: string name) throws (1: NotAliveException e)

Void killTopologyWithOpts (1: string name, 2: KillOptions options) throws (1: NotAliveException e)

Void activate (1: string name) throws (1: NotAliveException e)

Void deactivate (1: string name) throws (1: NotAliveException e)

Void rebalance (1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite)

/ / need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs

String beginFileUpload ()

Void uploadChunk (1: string location, 2: binary chunk)

Void finishFileUpload (1: string location)

String beginFileDownload (1: string file)

/ / can stop downloading chunks when receive0-length byte array back

Binary downloadChunk (1: string id)

/ / returns json

String getNimbusConf ()

/ / stats functions

ClusterSummary getClusterInfo ()

TopologyInfo getTopologyInfo (1: string id) throws (1:NotAliveException e)

/ / returns json

String getTopologyConf (1: string id) throws (1:NotAliveException e)

StormTopologygetTopology (1: string id) throws (1: NotAliveException e)

StormTopology getUserTopology (1: string id) throws (1:NotAliveException e)

}

The corresponding Java code in Nimbus.java is as follows

Public class Nimbus {

Public interface Iface {

Public void submitTopology (String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException

Public void submitTopologyWithOpts (String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

Public void killTopology (String name) throws NotAliveException, org.apache.thrift7.TException

Public void killTopologyWithOpts (String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException

Public void activate (String name) throws NotAliveException, org.apache.thrift7.TException

Public void deactivate (String name) throws NotAliveException, org.apache.thrift7.TException

Public void rebalance (String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException

Public String beginFileUpload () throwsorg.apache.thrift7.TException

Public void uploadChunk (String location, ByteBufferchunk) throws org.apache.thrift7.TException

Public void finishFileUpload (String location) throws org.apache.thrift7.TException

Public String beginFileDownload (String file) throws org.apache.thrift7.TException

Public ByteBuffer downloadChunk (String id) throws org.apache.thrift7.TException

Public String getNimbusConf () throwsorg.apache.thrift7.TException

Public ClusterSummary getClusterInfo () throwsorg.apache.thrift7.TException

Public TopologyInfo getTopologyInfo (String id) throws NotAliveException, org.apache.thrift7.TException

Public String getTopologyConf (String id) throws NotAliveException, org.apache.thrift7.TException

Public StormTopology getTopology (String id) throws NotAliveException, org.apache.thrift7.TException

Public StormTopology getUserTopology (String id) throws NotAliveException, org.apache.thrift7.TException

}

2 Client

1. First of all, Get Client

NimbusClient client = NimbusClient.getConfiguredClient (conf)

Look at the logic of client.getConfiguredClient under backtype.storm.utils

Just take out the host:port of nimbus from the configuration and new NimbusClient

Public static NimbusClient getConfiguredClient (Map conf) {

Try {

String nimbusHost = (String) conf.get (Config.NIMBUS_HOST)

Int nimbusPort = Utils.getInt (conf.get (Config.NIMBUS_THRIFT_PORT))

Return new NimbusClient (conf, nimbusHost, nimbusPort)

} catch (TTransportException ex) {

Throw new RuntimeException (ex)

}

}

NimbusClient inherits from ThriftClient, public class NimbusClient extends ThriftClient

What did ThriftClient do? The key is how to serialize the data and how to transfer the data to remote

Here we can see that Thrift encapsulates Transport and Protocol.

For Transport, it is actually the encapsulation of Socket, using TSocket (host, port)

Then for protocol, the default is TBinaryProtocol, if you don't specify it.

Public ThriftClient (Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

Try {

/ / locate loginconfiguration

Configuration login_conf = AuthUtils.GetConfiguration (storm_conf)

/ / construct atransport plugin

ITransportPlugin transportPlugin= AuthUtils.GetTransportPlugin (storm_conf, login_conf)

/ / create a socketwith server

If (host==null) {

Throw new IllegalArgumentException ("host is not set")

}

If (port (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory (TBinaryProtocol$Factory.))

Processor (Nimbus$Processor. Service-handler))

)

(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver)

(log-message "StartingNimbus server...")

(.serve server))

This is how Thrift is used in Storm shared by the editor. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report