Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize Registration Server in RocketMQ Source Code

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article is about how to register a server in the RocketMQ source code. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

NamesrvStartup

This class is used to start the registration server. Its main method delegates the main0 method, and the execution logic of this method is as follows:

Call the method NamesrvStartup#createNamesrvController to create an instance of NamesrvController, declared as controller.

Call the method NamesrvStartup#start to start the controller.

Then let's look at the specific contents of the two methods respectively.

CreateNamesrvController

The most important thing about this method is that the NamesrvController object is created with the constructor. More code is used to parse the command line object and, if possible, read the configuration information in the file and print the current overall configuration information before calling the constructor.

When these additional configurations do not exist, by default, the registration server is listening on port 9876.

Start

The function of this method is to start the NamesrvController instance of the input parameter. Specifically, the process is as follows:

Execute the method NamesrvController#initialize for initialization.

To add a hook to the runtime, the execution method NamesrvController#shutdown performs a graceful shutdown on the registration server when JVM shuts down.

Execute the method NamesrvController#start to start the registration server.

NamesrvController

This class is used to control the registration server.

Construction method

The main purpose of the construction method is to assign several important attributes. For example, initialize two important properties, kvConfigManager and routeInfoManager.

Initialize

This method is used to initialize the registration server and the execution logic is as follows:

Execute the method kvconfig.KVConfigManager#load to load the configuration information. By default, the contents of the ${user.home} / namesrv/kvConfig.json file are loaded into the property kvconfig.KVConfigManager#configTable.

Create a new NettyRemotingServer object to assign a value to the property NamesrvController#remotingServer. This newly created object uses BrokerHousekeepingService as the input parameter. The function of the BrokerHousekeepingService is to delete the channel from the routing information when the channel is closed, abnormal, idle, and so on.

Create a thread pool and assign a value to the property NamesrvController#remotingExecutor to register the server's business execution in Netty.

Call the method NamesrvController#registerProcessor to register the business processor with RemotingServer. The thread pool used is the thread pool created in step 3.

Create a periodic task with an interval of 10 seconds that calls the method RouteInfoManager#scanNotActiveBroker to scan for Broker in inactive mode.

Start

This method has no more content than simply launches RemotingServer. After this method, you can start listening for registration requests sent on the Broker.

KVConfigManager

This class is the configuration storage class for the registration server. The configuration information is stored in the file ${user.home} / namesrv/kvConfig.json. Internally, a HashMap structure, or two-level structure, is used to store configuration information.

The first level is the namespace, and the second is the KV pair, all in the form of strings.

The load method of this class can load data from the file into memory, and the persist method can rewrite the data in memory to the file.

DefaultRequestProcessor

This class is the most code-intensive class under the rocketmq-namesrv package. Because the business processing is implemented on this class.

According to the implementation pattern of the NettyRequestProcessor interface, the diversion of business requests is all in the processRequest method. Here, let's look at the commands supported by this class one by one.

PUT_KV_CONFIG

This command has no request body, and there are namespace, key and value fields in the request header. Call the method kvconfig.KVConfigManager#putKVConfig to put the configuration item in the configuration manager.

GET_KV_CONFIG

The command has no request body, and there are namespace and key fields in the request header. Call the method kvconfig.KVConfigManager#getKVConfig to get the corresponding configuration items.

If the configuration item exists, a successful response is returned. If the configuration information does not exist, a failure response is returned with a response code of QUERY_NOT_FOUND.

DELETE_KV_CONFIG

The command has no request body, and there are namespace and key fields in the request header. Call the method kvconfig.KVConfigManager#deleteKVConfig to delete the corresponding configuration item.

QUERY_DATA_VERSION

This command is used to query the data version number of Broker on the registration server. The specific execution logic is as follows:

The DataVersion object is parsed from the content body of the command and the BrokerAddr data is parsed from the request header. Using these two as input parameters, call the method RouteInfoManager#isBrokerTopicConfigChanged to determine whether the version number of the BrokerAddr on the server is the same, and declare the result as changed.

If changed is false, indicating that the version number has not changed, then the data on the server is still valid at the current time, and the valid time for calling the method RouteInfoManager#updateBrokerInfoUpdateTimestamp to update this data.

Call the method RouteInfoManager#queryBrokerTopicConfig to query the version number corresponding to BrokerAddr on the server, declared as nameSeverDataVersion.

Build the command response object, and if the nameSeverDataVersion is not null, it is encoded and set to the content body. Set the changed property in the response header with the value of the declaration object generated by step 1.

REGISTER_BROKER

This command is used for the registration of Broker information. First, get the version number of MQ in the request header. If the version number is greater than or equal to 3.0.11, call the method processor.DefaultRequestProcessor#registerBrokerWithFilterServer to register the information; otherwise, call the method processor.DefaultRequestProcessor#registerBroker to register the information.

RegisterBrokerWithFilterServer

The execution logic of the method is as follows:

Decode the request command and create a RegisterBrokerRequestHeader object. Crc verification is performed using the object object and the body field in the request, and if the verification fails, a system error response is returned. Otherwise, continue the follow-up process.

If the command request object contains the content body, the RegisterBrokerBody object is decoded and declared as registerBrokerBody. If the command request object does not contain a content body, create the RegisterBrokerBody object manually and set its DataVersion version number to 0 and its timestamp to 0. 0.

Call the method RouteInfoManager#registerBroker to register the routing information and declare the result as result.

Create a response header object of type RegisterBrokerResponseHeader, declared as responseHeader. Set the masterAddr and HaServerAddr properties of result to the response header object.

From the configuration manager, take ORDER_TOPIC_CONFIG as the namespace, take out the configuration data object under that namespace, and set the binary to the response body after coding.

Returns the response object.

RegisterBroker

The process is basically the same as that of the registerBrokerWithFilterServer method, except that when calling the method RouteInfoManager#registerBroker, the filterServerList of the input parameter is null.

UNREGISTER_BROKER

This command is used to unregister Broker. The call to the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker is completed, and the method is internally delegated to the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker.

GET_ROUTEINFO_BY_TOPIC

The name is used to query the routing information for the topic, calling the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic.

This method is used to obtain the full amount of routing information according to the topic name in the route manager. The specific process is as follows:

Use the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData to get a result of type TopicRouteData based on the requested topic name, declared as topicRouteData.

If topicRouteData is not null, execute the following subprocess.

If configuration org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable is enabled, obtain the configuration information of the input parameter topic name from the namespace ORDER_TOPIC_CONFIG and declare it as orderTopicConf. Set orderTopicConf to the property org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf.

Encode the topicRouteData, set it to the content body of the response, and return the response object.

If topicRouteData is null, the TOPIC_NOT_EXIST response is returned.

GET_BROKER_CLUSTER_INFO

This command calls the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo. The logic of this method is to call the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo to get an encoded content body, set the content body to the content body of the response, and return the response object.

The encoded volume data structure class is ClusterInfo, whose attributes are as follows

HashMap brokerAddrTable;HashMap clusterAddrTable;WIPE_WRITE_PERM_OF_BROKER

This command is used to erase write permissions for Broker, which means that all topics on that Broker have no write permissions. The method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker implementation is called, and the logic of the method is as follows:

Call the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock to erase the write permission of the input parameter Broker, and the return value of the method is the number of queue information erased. Declare the result as wipeTopicCnt.

Set the wipeTopicCnt to the corresponding property of the response header and return the response.

GET_ALL_TOPIC_LIST_FROM_NAMESERVER

This command is used to get the full amount of topic information on the registration server and calls the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver implementation.

The method internally calls the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList to get a list of all the topic names, encodes it as a binary array, sets it to the content of the response, and returns the response.

DELETE_TOPIC_IN_NAMESRV

This command is used to delete the topic information on the server and is implemented through the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv. The method implementation is also simple, just delete the corresponding topic name from the topicQueueTable.

GET_KVLIST_BY_NAMESPACE

This command is used to get configuration information under a specific namespace on the server. The corresponding configuration information is obtained by the method org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace and encoded as a binary array.

If the array exists, it is set to the content of the response and a successful response is returned.

If the array does not exist, a QUERY_NOT_FOUND response is returned.

GET_TOPICS_BY_CLUSTER

This command is used to get all the topic names under the cluster, which is completed by calling the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster. The method internally calls org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster to get the encoding results of all topic names under the cluster, sets the binary array of the encoding results to the content of the response, and returns a successful response.

GET_SYSTEM_TOPIC_LIST_FROM_NS

This command is a bit strange to see that the command name is to get a list of system topics. However, in terms of method implementation, the internal content as a whole is chaotic. This command will be put aside for a while and will be processed when you see the associated request query.

GET_UNIT_TOPIC_LIST

This command is used to get the collection of topic names identified by unit under the cluster. Implemented by the method org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList, the method org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics is called internally to return an encoded binary array of the collection of topic names identified by the unit. Set the array to the content of the response and return.

GET_HAS_UNIT_SUB_TOPIC_LIST

This command is used to get the collection of topic names identified by unit_sub under the cluster. It is the same as the GET_UNIT_TOPIC_LIST command in practice, but with a different identity.

GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST

This command is used to get a collection of topic names identified by both unit and unit_sub under the cluster. The practice is the same as the above, except that the logo is different.

UPDATE_NAMESRV_CONFIG

This command is used for the administrator to send the configuration text directly to the registration service, to update the configuration of the registration service itself, and then persist the configuration information to a disk file.

GET_NAMESRV_CONFIG

This command is used to get the configuration information for the registration service and set the configuration information to the content of the response.

RouteInfoManager

This class is the manager of routing information, in which multiple classes are used to abstract various routing information. Let's take a look at these definition classes.

QueueData

This class holds the queue information in Broker. Have the following attributes:

The name of brokerName,Broker, which by default is the domain name of the machine where Broker resides, can be defined by configuration.

ReadQueueNums, the number of queues used for reading.

WriteQueueNums, the number of queues used for writes.

Perm, the permission information of the Broker, which refers to whether it is readable and writable.

TopicSynFlag, subject synchronization identity.

BrokerData

This class stores the address information of the Broker cluster and has the following attributes:

Cluster, cluster ID.

BrokerName,Broker name.

Mapping tables for brokerAddrs,brokerId and BrokerAddr. This attribute stores the mapping of id and address under the same Broker name.

BrokerLiveInfo

This class holds the survival information of a specific Broker and has the following properties

LastUpdateTimestamp, the last data update time.

DataVersion, the version number of the theme configuration information for this Broker.

The Channel object of channel,Netty, which is the link object between Broker and the server.

HaServerAddr, the highly available primary node address. The format is ${ip}: ${port}.

Storage attribute

RouteInfoManager manages five Map structures internally to store routing-related information, which can be seen more clearly in code, as follows:

HashMap topicQueueTable;HashMap brokerAddrTable;HashMap clusterAddrTable;HashMap brokerLiveTable;HashMap filterServerTable;registerBroker

This method is used to register the Broker information on the route manager. The specific method flow is as follows:

Obtain the names of all the Broker under the cluster from the clusterAddrTable with the clusterName of the input parameter, and declare it as brokerNames.

If brokerNames is null, it is assigned an empty HashSet. And put the clusterName and brokerNames values in clusterAddrTable.

Add the name of the Broker registered this time to brokerNames.

Get the BrokerData object from brokerAddrTable as brokerName, and if it doesn't exist, create a new one and put it in brokerAddrTable.

Take out the brokerAddrs mapping in brokerData in step 4, and iterate through the elements. If the value is equal to the brokerAddr of the input parameter, and the key is different from the brokerId of the input parameter, delete this key-value pair. This indicates that the Broker information corresponding to the IP has changed at this time.

Put the brokerId and brokerAddr of the input parameters into the brokerAddrs.

If brokerId is 0, that is, the master node, and the topicConfigWrapper of the input parameter is not null, that is, the registration command sent by Broker contains the request body, then the sub-process is executed. Otherwise, continue the follow-up process.

Query the version number of the broker from brokerLiveTable and compare it with the version number of topicConfigWrapper to confirm whether there is any change. If there is a change, or if the Broker is newly registered (the first registration of brokerName or the first registration of brokerId), it is likely that the new theme configuration information is carried this time. You need to update the topic configuration information on the registration server. The follow-up process is carried out. Otherwise, finish the subprocess and proceed to step 8.

Iterate through the property TopicConfigSerializeWrapper#topicConfigTable, calling the method RouteInfoManager#createAndUpdateQueueData for each element in the collection to update the queue information corresponding to the topic.

Build the BrokerLiveInfo object and put it in the brokerLiveTable.

If the filterServerList of the input parameter is not null, put in filterServerTable.

If the brokerId is not 0, that is, the slave node is currently registering itself, the address of the master node is obtained from the brokerAddrs. If the primary node address exists, its HaServer address is further obtained. Set these two data to the returned result object result.

Returns the result object result. As you can see from the code, if the current registration is not a slave node, or the corresponding master node does not exist, then result is an empty object.

CreateAndUpdateQueueData

This method is used to create or update QueueData objects in topicQueueTable. The specific process is as follows:

Build a QueuData object with properties from brokerName and topicConfig objects.

Get the queueDataList object corresponding to the topicConfig topic from topicQueueTable.

If the queueDataList does not exist, it means that the topic appears in the registration server for the first time. Build a new linkedList object, add a queueData object to it, and put queueDataList into topicQueueTable. The process is over.

If queueDataList exists, its elements are traversed, performing the following subactions.

If the brokerName attribute of the element is the same as the brokername value of the input parameter, the subsequent process continues, otherwise the next loop iteration proceeds.

Determine whether the element is the same as the object built in step 1, and if so, do not do anything; if it is different, it means that the data has changed, and the element is removed from the collection.

If any of the elements in step 4 are deleted, the object from step 1 is added to the queueDataList.

UnregisterBroker

This method is used to delete the information of a Broker in the route manager. The specific process is as follows:

Delete the Broker information in brokerLiveTable.

Delete the information for the Broker in filterServerTable.

Declare a local variable removeBrokerName. Get the brokerData corresponding to the BrokerName from brokerAddrTable. If it is not empty, the subprocess is executed.

Removes the mapping for that brokerId from the brokerAddrs of brokerData.

If the brokerAddrs collection is empty, the mapping for that brokerName is removed from the brokerAddrTable. Assign a value of true to removeBrokerName.

If removeBrokerName is true, the subprocess is executed, otherwise the process ends.

Get the collection of brokerName corresponding to the clusterName from clusterAddrTable and declare it as nameSet.

If nameSet is not null, delete the brokerName from nameSet. If the nameSet is empty after deletion, the mapping for that brokerName is removed from the clusterAddrTable.

Call the method removeTopicByBrokerName to delete the topic information corresponding to brokerName.

RemoveTopicByBrokerName

This method is used to delete the topic configuration information corresponding to brokerName. The specific execution logic is as follows:

Iterate through the topicQueueTable to perform subsequent logic for each element.

For each element, take out its QueueData list and traverse the object. Execute the subprocess.

Iterate through the QueueData list and remove the element QueueData from the list if its brokerName is the same as the input parameter brokerName.

After traversing, if the list is empty, the mapping is removed from topicQueueTable.

PickupTopicRouteData

First, let's take a look at the definition of the data structure object TopicRouteData, whose properties are as follows

String orderTopicConf;List queueDatas;List brokerDatas;HashMap filterServerTable

The implementation logic of the pickupTopicRouteData method can also be simply inferred from the data structure object. Roughly speaking, it is divided into several steps:

Query queueDatas by topic name from topicQueueTable.

According to the brokerName attribute of each element QueueData in the queueDatas, the brokerData object is obtained from the brokerAddrTable to form a List, that is, brokerDatas.

Query the corresponding filterServer list from filterServerTable according to the brokerDatas in step 2, and assemble it into a mapping.

Assemble the values from steps 1 to 3 as TopicRouteData objects and return them to the caller.

WipeWritePermOfBrokerByLock

This method acquires the write lock in an interruptible manner and calls the method wipeWritePermOfBroker after the acquisition is successful. If the fetch fails, 0 is returned, and if the fetch succeeds, the method wipeWritePermOfBroker executes the erasure work.

The content of the wipeWritePermOfBroker method is also very simple, traversing the topicQueueTable, traversing its QueueData for each element, and finding the corresponding QueueData if the brokerName is the same as the brokerName of the input parameter. Reset the value of the perm property in this and remove the flag bit that represents the write permission.

GetUnitTopics

This method is used to get a collection of topic names with an unit identity. The specific process is as follows:

Acquire the read lock in an interruptible manner. Iterate through the topicQueueTable element.

If the topicSynFlag attribute value of the first element of the QueueData list in the key-value pair contains the unit identity, add the key of the key-value pair, that is, the topic name, to the temporary collection.

After traversing, the binary array encoded by the temporary collection is returned.

OnChannelDestroy

This method is triggered when a Broker channel is closed. Although there is a lot of code for this method, the idea of the method is very simple. First, the corresponding BrokerLiveInfo object is found in brokerLiveTable through Channel. And depending on the information of this object, delete all related information interfaces in the route manager.

Thank you for reading! On "how to register the server in the RocketMQ source code" this article is shared here, I hope the above content can be of some help to you, so that you can learn more knowledge, if you think the article is good, you can share it out for more people to see it!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report