In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the relevant knowledge of "what is the implementation mechanism of RocketMQ ACL". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
According to the RocketMQ ACL user manual, we should first take a look at how the Broker server loads the configuration file and how it works when the ACL mechanism is turned on.
1 、 BrokerController#initialAcl
The entry code of ACL on Broker is: BrokerController#initialAcl
Private void initialAcl () {if (! this.brokerConfig.isAclEnable) {/ / [@ 1] (https://my.oschina.net/u/1198) log.info ("The broker dose not enable acl"); return;} List accessValidators = ServiceProvider.load (ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class) / / @ 2 if (accessValidators = = null | | accessValidators.isEmpty ()) {log.info ("The broker dose not load the AccessValidator"); return;} for (AccessValidator accessValidator: accessValidators) {/ / [@ 3] (https://my.oschina.net/u/2648711) final AccessValidator validator = accessValidator This.registerServerRPCHook (new RPCHook () {[@ Override] (https://my.oschina.net/u/1162528) public void doBeforeRequest (String remoteAddr, RemotingCommand request) {/ / Do not catch the exception validator.validate (validator.parse (request, remoteAddr) / / @ 4} @ Override public void doAfterResponse (String remoteAddr, RemotingCommand request, RemotingCommand response) {});}}
There are four key points in the implementation of this method. Code @ 1: first determine whether acl is enabled in Broker. It is specified by the configuration parameter aclEnable. The default is false.
Code @ 2: load the configured AccessValidator using a similar SPI mechanism. This method returns a list and reads the access verifier configured in the META-INF/service/org.apache.rocketmq.acl.AccessValidator file when implementing the logic. The default configuration is as follows:
Code @ 3: traverses the configured access verifier (AccessValidator) and registers the hook function with the Broker processing server. The doBeforeRequest method of RPCHook will be called after the request is received by the server and decoded before processing the request. The doAfterResponse method of RPCHook will be called after processing the request and before returning the result, as shown in the figure:
Code @ 4: call AccessValidator#validate in the RPCHook#doBeforeRequest method, execute the validation logic of ACL before actually processing the command, and let go if you have permission to execute the operation, otherwise AclException is thrown.
Next, we will focus on the access validator that Broker implements by default: PlainAccessValidator.
2. PlainAccessValidator2.1 class diagram
AccessValidator accesses the verifier interface, which mainly defines two interfaces. 1) AccessResource parse (RemotingCommand request, String remoteAddr) parses the access resources corresponding to this request from the request header, that is, the access permissions required for this request. 2) void validate (AccessResource accessResource) compares the permission of the requested user with that of the requested user according to the access permission required this time, and determines that it has the permission. If it does not have the permission to access the operation, it will throw an exception, otherwise it will be released.
An access verifier based on the yml configuration format provided by PlainAccessValidator RocketMQ by default.
Next, let's focus on the implementation details of PlainAccessValidator's parse method and validate method. Before we explain this method, let's first take a look at the PlainAccessResource that RocketMQ encapsulates to access resources.
2.1.2 PlainAccessResource class diagram
Let's introduce its attributes one by one:
Private String accessKey accesses Key, user name.
Private String secretKey user password.
Whitelist of private String whiteRemoteAddress remote IP addresses.
Whether private boolean admin is an administrator role.
Private byte defaultTopicPerm = 1 default topic access, that is, if you do not have permission to configure topic, Topic default access is 1, expressed as DENY.
Private byte defaultGroupPerm = 1 default consumer group access, default is DENY.
The mapping table of access permissions required for private Map resourcePermMap resources.
Private RemoteAddressStrategy remoteAddressStrategy remote IP address verification policy.
The requestCode currently requested by private int requestCode.
Private byte [] content the content of the request header and the request body.
Private String signature signature string, which is a common pattern. On the client side, the request parameters are sorted first, and then the secretKey is used to generate the signature string. The server repeats this step, and then compares the signature string. If the same, login is considered successful, otherwise failed.
Private String secretToken key token.
The role of private String recognition is currently unknown and is not currently used in the code.
2.2 Construction method public PlainAccessValidator () {aclPlugEngine = new PlainPermissionLoader ();}
Constructor, which directly creates the PlainPermissionLoader object, from a naming point of view, it should trigger the loading of acl rules, that is, parsing plain_acl.yml. Next, we will focus on the parsing of configuration files for acl startup process.
2.3 parse method
The function of this method is to parse the access rights required for this access from the request command, and finally build the AccessResource object to prepare for the subsequent verification of permissions.
PlainAccessResource accessResource = new PlainAccessResource (); if (remoteAddr! = null & & remoteAddr.contains (":")) {accessResource.setWhiteRemoteAddress (":") [0]);} else {accessResource.setWhiteRemoteAddress (remoteAddr);}
Step1: first create a PlainAccessResource and extract the remote access IP address from the remote address.
If (request.getExtFields () = = null) {throw new AclException ("request's extFields value is null");} accessResource.setRequestCode (request.getCode ()); accessResource.setAccessKey (request.getExtFields (). Get (SessionCredentials.ACCESS_KEY)); accessResource.setSignature (request.getExtFields (). Get (SessionCredentials.SIGNATURE)); accessResource.setSecretToken (request.getExtFields (). Get (SessionCredentials.SECURITY_TOKEN))
Step2: if the extension field in the request header is empty, an exception is thrown, and if it is not empty, requestCode, accessKey (request user name), signature string (signature), secretToken are read from the request header.
Try {switch (request.getCode ()) {case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm (request.getExtFields (). Get ("b"), Permission.PUB) Break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm (request.getExtFields (). Get ("originTopic"), Permission.PUB); accessResource.addResourceAndPerm (getRetryTopic (request.getExtFields (). Get ("group")), Permission.SUB); break Case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.SUB); accessResource.addResourceAndPerm (getRetryTopic (request.getExtFields (). Get ("consumerGroup")), Permission.SUB); break Case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode (request.getBody (), HeartbeatData.class) For (ConsumerData data: heartbeatData.getConsumerDataSet ()) {accessResource.addResourceAndPerm (getRetryTopic (data.getGroupName ()), Permission.SUB); for (SubscriptionData subscriptionData: data.getSubscriptionDataSet ()) {accessResource.addResourceAndPerm (subscriptionData.getTopic (), Permission.SUB) }} break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader (UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm (getRetryTopic (unregisterClientRequestHeader.getConsumerGroup ()), Permission.SUB) Break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHead er (GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm (getRetryTopic (getConsumerListByGroupRequestHeader.getConsumerGroup ()), Permission.SUB); break Case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader (UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm (getRetryTopic (updateConsumerOffsetRequestHeader.getConsumerGroup ()), Permission.SUB); accessResource.addResourceAndPerm (updateConsumerOffsetRequestHeader.getTopic (), Permission.SUB) Break; default: break;}} catch (Throwable t) {throw new AclException (t.getMessage (), t);}
Step3: set the permissions required for this request according to the request command. The above code is relatively simple, that is, the Topic and message group name of this operation are obtained from the request. To facilitate the distinction between topic and consumption group, the consumer group uses the retry theme corresponding to the consumer as the Key of the resource. It can also be seen from this that the request commands for ACL permission verification in the current version are as follows:
SEND_MESSAGE
SEND_MESSAGE_V2
CONSUMER_SEND_MSG_BACK
PULL_MESSAGE
QUERY_MESSAGE
HEART_BEAT
UNREGISTER_CLIENT
GET_CONSUMER_LIST_BY_GROUP
UPDATE_CONSUMER_OFFSET
/ / ContentSortedMap map = new TreeMap (); for (Map.Entry entry: request.getExtFields (). EntrySet ()) {if (! SessionCredentials.SIGNATURE.equals (entry.getKey () {map.put (entry.getKey (), entry.getValue ());} accessResource.setContent (AclUtils.combineRequestContent (request, map)); return accessResource
Step4: sort the extension fields to facilitate the generation of signature strings, and then write the extension fields and the request body (body) to the content field. Complete parsing the permissions that need to be verified from the request header.
2.4 validate method public void validate (AccessResource accessResource) {aclPlugEngine.validate ((PlainAccessResource) accessResource);}
Verify permissions, that is, the permissions required according to this request are compared with those of the current user, and if they match, they will be executed normally; otherwise, AclException will be thrown.
In order to uncover the parsing and verification of the configuration file, we turned our attention to PlainPermissionLoader.
3 、 PlainPermissionLoader
The main responsibility of this class is to load permissions, that is, to parse acl's main configuration file, plain_acl.yml.
3.1 Class Diagram
The following describes its core attributes and methods one by one:
DEFAULT_PLAIN_ACL_FILE defaults to the acl profile name and defaults to conf/plain_acl.yml.
The name of the String fileName acl profile, which defaults to DEFAULT_PLAIN_ACL_FILE, can be specified by the system parameter-Drocketmq.acl.plain.file=fileName.
The permission configuration mapping table parsed by Map plainAccessResourceMap, with the user name as the key.
RemoteAddressStrategyFactory remoteAddressStrategyFactory remote IP resolution policy factory, which is used to resolve whitelist IP addresses.
Whether boolean isWatchStart enables file listening, that is, automatically listens to plain_acl.yml files. Once the file is changed, it can automatically take effect without restarting the server.
The public PlainPermissionLoader () construction method.
Public void load () loads the configuration file.
Public void validate (PlainAccessResource plainAccessResource) verifies that you have permission to access the resource to be accessed.
3.2 PlainPermissionLoader construction method public PlainPermissionLoader () {load (); watch ();}
Call the load and watch methods in the constructor.
3.3 loadMap plainAccessResourceMap = new HashMap (); List globalWhiteRemoteAddressStrategy = new ArrayList (); String path = fileHome + File.separator + fileName;JSONObject plainAclConfData = AclUtils.getYamlDataObject (path,JSONObject.class)
Step1: initializes plainAccessResourceMap (user-configured access resources, that is, permission container) and globalWhiteRemoteAddressStrategy: global IP whitelist access policy. Configuration file, default is ${ROCKETMQ_HOME} / conf/plain_acl.yml.
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray ("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList! = null & &! globalWhiteRemoteAddressesList.isEmpty ()) {for (int I = 0; I < globalWhiteRemoteAddressesList.size (); iTunes +) {globalWhiteRemoteAddressStrategy.add (remoteAddressStrategyFactory. GetRemoteAddressStrategy (globalWhiteRemoteAddressesList.getString (I));}}
Step2:globalWhiteRemoteAddresses: global whitelist with array type. According to the configured rules, use remoteAddressStrategyFactory to obtain an access policy, which is highlighted below.
JSONArray accounts = plainAclConfData.getJSONArray ("accounts"); if (accounts! = null & &! accounts.isEmpty ()) {List plainAccessConfigList = accounts.toJavaList (PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig: plainAccessConfigList) {PlainAccessResource plainAccessResource = buildPlainAccessResource (plainAccessConfig); plainAccessResourceMap.put (plainAccessResource.getAccessKey (), plainAccessResource);} this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;this.plainAccessResourceMap = plainAccessResourceMap
Step3: parses another root element in the plain_acl.yml file, accounts, the user-defined permission information. From the definition of PlainAccessConfig, the following tags are supported under the accounts tag:
AccessKey
SecretKey
WhiteRemoteAddress
Admin
DefaultTopicPerm
DefaultGroupPerm
TopicPerms
For the description of the above label of groupPerms, please refer to: "RocketMQ ACL user Guide". The specific parsing process is relatively easy, so I won't go into details.
The load method mainly completes the parsing of the acl configuration file and loads the user-defined permissions into memory.
3.4 watchprivate void watch () {try {String watchFilePath = fileHome + fileName; FileWatchService fileWatchService = new FileWatchService (new String [] {watchFilePath}, new FileWatchService.Listener () {@ Override public void onChanged (String path) {log.info ("The plain acl yml changed, reload the context"); load () }); fileWatchService.start (); log.info ("Succeed to start AclWatcherService"); this.isWatchStart = true;} catch (Exception e) {log.error ("Failed to start AclWatcherService", e);}}
Listener, which defaults to the frequency of 500ms to determine whether the content of the file has changed. When the contents of the file change, call the load () method to reload the configuration file. So how does FileWatchService determine that the contents of the two files have changed?
FileWatchService#hashprivate String hash (String filePath) throws IOException, NoSuchAlgorithmException {Path path = Paths.get (filePath); md.update (Files.readAllBytes (path)); byte [] hash = md.digest (); return UtilAll.bytes2string (hash);}
Get the md5 signature of the file for comparison. Why not record the modification time of the last file at startup, and then judge whether the modification time has changed, and then determine whether its content has really changed.
3.5 validate// Check the global white remote addrfor (RemoteAddressStrategy remoteAddressStrategy: globalWhiteRemoteAddressStrategy) {if (remoteAddressStrategy.match (plainAccessResource)) {return;}}
Step1: first, use the global whitelist to verify the resource. As long as a rule matches, it is returned, indicating that the authentication is successful.
If (plainAccessResource.getAccessKey () = = null) {throw new AclException (String.format ("No accessKey is configured"));} if (! plainAccessResourceMap.containsKey (plainAccessResource.getAccessKey () {throw new AclException (String.format ("No acl config for% s", plainAccessResource.getAccessKey ();} Step2: if no user name is set in the request information, an unconfigured AccessKey exception is thrown; if the user's configuration information is not configured in Broker, AclException is thrown. / / Check the white addr for accesskeyPlainAccessResource ownedAccess = plainAccessResourceMap.get (plainAccessResource.getAccessKey ()); if (ownedAccess.getRemoteAddressStrategy () .match (plainAccessResource)) {return;}
Step3: if the whitelist configured by the user matches the rules of the resource to be accessed, the authentication will be passed directly.
/ / Check the signatureString signature = AclUtils.calSignature (plainAccessResource.getContent (), ownedAccess.getSecretKey ()); if (! signature.equals (plainAccessResource.getSignature () {throw new AclException (String.format ("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey ();}
Step4: verify the signature.
CheckPerm (plainAccessResource, ownedAccess)
Step5: call the checkPerm method to verify that the required permissions match the permissions you have.
3.5.1 checkPermif (Permission.needAdminPerm (needCheckedAccess.getRequestCode ()) & &! ownedAccess.isAdmin ()) {throw new AclException (String.format ("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode (), ownedAccess.getAccessKey ());}
Step6: if the current request command belongs to a permission that must be accessed by an Admin user, and the current user is not in the administrator role, an exception is thrown. The following command requires the admin role to perform the operation:
Map needCheckedPermMap = needCheckedAccess.getResourcePermMap (); Map ownedPermMap = ownedAccess.getResourcePermMap (); if (needCheckedPermMap = = null) {/ / If the needCheckedPermMap is null,then return return;} if (ownedPermMap = = null & & ownedAccess.isAdmin ()) {/ / If the ownedPermMap is null and it is an admin user, then return return;}
Step7: if the request does not require permission verification, it is authenticated. If the current user is in the role of administrator and no user permissions are configured, authentication is passed and returned.
For (Map.Entry needCheckedEntry: needCheckedPermMap.entrySet ()) {String resource = needCheckedEntry.getKey (); Byte neededPerm = needCheckedEntry.getValue (); boolean isGroup = PlainAccessResource.isRetryTopic (resource); if (ownedPermMap = = null | |! ownedPermMap.containsKey (resource)) {/ / Check the default perm byte ownedPerm = isGroup? OwnedAccess.getDefaultGroupPerm (): ownedAccess.getDefaultTopicPerm (); if (! Permission.checkPermission (neededPerm, ownedPerm)) {throw new AclException (String.format ("No default permission for% s", PlainAccessResource.printStr (resource, isGroup));} continue;} if (! Permission.checkPermission (neededPerm, ownedPermMap.get (resource) {throw new AclException (String.format ("No default permission for% s", PlainAccessResource.printStr (resource, isGroup) }}
Step8: traversal requires a comparison between permissions and permissions. If the corresponding permissions are configured, it is determined whether they match; if no permissions are configured, whether the default permissions are allowed or not, AclException is thrown.
This is the end of the verification logic, and the matching flow chart is given below:
The above describes the process of starting from the Broker server, loading the acl configuration file, dynamically monitoring the configuration file, and verifying the permissions of the server. Next, let's take a look at what the client needs to deal with about ACL.
4 、 AclClientRPCHook
Recall that after we introduced the ACL mechanism, the client side code example is as follows:
When it creates the DefaultMQProducer, it registers the AclClientRPCHook hook and executes its hook function before and after sending remote commands to the server. Let's focus on AclClientRPCHook.
4.1 doBeforeRequestpublic void doBeforeRequest (String remoteAddr, RemotingCommand request) {byte [] total = AclUtils.combineRequestContent (request, parseRequestContent (request, sessionCredentials.getAccessKey (), sessionCredentials.getSecurityToken ()); / / @ 1 String signature = AclUtils.calSignature (total, sessionCredentials.getSecretKey ()); / / @ 2 request.addExtField (SIGNATURE, signature) / / @ 3 request.addExtField (ACCESS_KEY, sessionCredentials.getAccessKey ()); / / The SecurityToken value is unneccessary,user can choose this one. If (sessionCredentials.getSecurityToken ()! = null) {request.addExtField (SECURITY_TOKEN, sessionCredentials.getSecurityToken ());}}
Code @ 1: sort the Request request parameters and add accessKey.
Code @ 2: for the ordered parameters, use the password configured by the user to generate a signature, and then go to the extension field Signature recently, and then the server will generate Signature according to the same algorithm. If the same, the signature verification is successful (similar to the effect of login).
Code @ 3: add Signature, AccessKey, etc., to the extended field of the request header, and the server gets the metadata. Combined with the information in the request header, the permissions are verified according to the configured permissions.
ACL client-side signature generation is a general pattern, so I won't go into details.
This is the end of the content of "what is the implementation mechanism of RocketMQ ACL". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.