Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize privilege Control in RocketMQ

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article will explain in detail how to achieve access control in RocketMQ. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

1. Easy to use

1.1.What is ACL

ACL is the abbreviation of access control list, commonly known as access control list. Access control basically involves the concepts of users, resources, permissions, roles, and so on. Which objects will the above-mentioned objects correspond to in RocketMQ?

User: user is the basic element of access control, RocketMQ ACL will inevitably introduce the concept of user, that is, support user name and password. Resources: the objects that need to be protected, the Topic involved in message sending and the consumer groups involved in message consumption should be protected, so they can be abstracted into resources. Permissions: actions that can be performed for resources. Roles: in RocketMQ, only two roles are defined: whether they are administrators or not.

1.2.Configuring ACL in RocketMQ

Acl default configuration file name: plain_acl.yml, which needs to be placed in the ${ROCKETMQ_HOME} / store/config directory

If you need to use acl, you must enable this feature on the server side, and configure it in the configuration file of Broker. AclEnable = true enable this feature.

Configure the plain_acl.yml file

GlobalWhiteRemoteAddresses:- 10.10.15.DENY defaultGroupPerm-accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms:-topicA=DENY-topicB=PUB | SUB- topicC=SUB groupPerms: # the group should convert to retry topic-groupA=DENY-groupB=PUB | SUB- groupC=SUB- accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true

Let's introduce the meaning and use of the relevant parameters in the plain_acl.yml file.

Field value meaning globalWhiteRemoteAddresses*;192.168.*.*;192.168.0.1 global IP whitelist accessKey string Access Key user name secretKey string Secret Key password whiteRemoteAddress*;192.168.*.*;192.168.0.1 user IP whitelist admintrue;false whether administrator account defaultTopicPermDENY;PUB;SUB;PUB | SUB default Topic permissions defaultGroupPermDENY;PUB;SUB;PUB | SUB default ConsumerGroup permissions topicPermstopic= permissions groupPermsgroup= permissions of each Topic permissions of each ConsumerGroup

Meaning of permission identifier

Permission meaning DENY rejects ANYPUB or SUB permission PUB sends permission SUB subscription permission

Processing flow

Special requests, such as UPDATE_AND_CREATE_TOPIC, can only be operated by admin account.

For a resource, if there is explicit configuration permission, the configured permission is adopted; if there is no explicit configuration permission, the default permission is adopted.

The default implementation of RocketMQ's permission control store is based on the yml configuration file. Users can dynamically modify the attributes defined by the access control without restarting the Broker service node

If ACL is enabled at the same time as the highly available deployment (Master/Slave schema), you need to set the global whitelist information in the ${ROCKETMQ_HOME} / store/conf/plain_acl.yml configuration file of the Broker Master node, that is, to set the ip address of the Slave node to the global whitelist of the Master node plain_acl.yml profile

1.3.Code example 1.3.1, producer code public class AclProducer {public static void main (String [] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name", getAclRPCHook ()); producer.setNamesrvAddr ("10.10.15.246 producer.start ()); for (int I = 0; I

< 10; i++) { try { Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); }} 查看结果

The error indicates that topicA does not have permission, and what we configure in the plain_acl.yml file is RocketMQ user rejection, production and consumption topicA topic information. If we change the theme to topicB, we find that the message was sent successfully. TopicB=PUB | SUB sets the permission for both production and consumption.

View the result

1.3.2. Consumer code public class AclConsumer {public static void main (String [] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("groupA", getAclRPCHook (), new AllocateMessageQueueAveragely ()); consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe ("topicB", "*"); consumer.setNamesrvAddr ("10.10.15.246 args 9876") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {System.out.printf ("% s Receive New Messages:% s% n", Thread.currentThread () .getName (), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}); consumer.start () System.out.printf (Consumer Started.%n);} static RPCHook getAclRPCHook () {return new AclClientRPCHook (new SessionCredentials ("RocketMQ", "12345678"));}}

Check the results: it is found that no messages are consumed and no error messages are reported. For RocketMQ users, topicB is set to be available for production and consumption, but we find that its groupA=DENY is rejected. If the consumer group is groupA, we refuse to consume any messages. We change it to groupB or groupC to view the results.

2. Source code analysis

Broker side ACL schematic diagram

2.1. ACL-related operations during Broker initialization

Call the acl-related initialization method initialAcl () when the Broker service starts to create BrokerController and initialize initialize ().

Whether to enable ACL function in private void initialAcl () {/ / broker configuration file. Turn off if (! this.brokerConfig.isAclEnable ()) {log.info ("The broker dose not enable acl"); return by default. } / / get the list of permission access verifiers. Point to / / org.apache.rocketmq.acl.plain.PlainAccessValidator in the loaded META-INF/service/org.apache.rocketmq.acl.AccessValidator file. By default, there is only one List accessValidators = ServiceProvider.load (ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators = = null | | accessValidators.isEmpty ()) {log.info ("The broker dose not load the AccessValidator"); return } for (AccessValidator accessValidator: accessValidators) {final AccessValidator validator = accessValidator; / / register the "hook" object of the server, and verify the permissions this.registerServerRPCHook (new RPCHook () {@ Override public void doBeforeRequest (String remoteAddr, RemotingCommand request) {/ / Do not catch the exception validator.validate (validator.parse (request, remoteAddr) @ Override public void doAfterResponse (String remoteAddr, RemotingCommand request, RemotingCommand response) {}});}}

There are relevant comments in the source code. Let's take a look at the registration registerServerRPCHook method.

Public void registerServerRPCHook (RPCHook rpcHook) {/ / the NettyRemotingServer service registers the "hook" function getRemotingServer (). RegisterRPCHook (rpcHook); this.fastRemotingServer.registerRPCHook (rpcHook);}

With regard to the use of NettyRemotingServer services in conjunction with NettyRemotingClient services, RocketMQ Remoting will focus on the analysis of

2.2.2.The PlainAccessValidator permission verifier

PlainAccessValidator.parse (), Code needs different verification resources according to different requests of the client

Switch (request.getCode ()) {/ / to send a message needs to verify whether the topic of the current account has PUB permission case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm (request.getExtFields (). Get ("b"), Permission.PUB); break Case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm (request.getExtFields (). Get ("originTopic"), Permission.PUB); accessResource.addResourceAndPerm (getRetryTopic (request.getExtFields (). Get ("group")), Permission.SUB); break / / when pulling messages, you need to know whether the topic pulled under the consumer account has SUB permission, and whether the subscription group consumerGroup has sub permission case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.SUB); accessResource.addResourceAndPerm (getRetryTopic (request.getExtFields (). Get ("consumerGroup")), Permission.SUB); break Case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm (request.getExtFields (). Get ("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode (request.getBody (), HeartbeatData.class); for (ConsumerData data: heartbeatData.getConsumerDataSet ()) {accessResource.addResourceAndPerm (getRetryTopic (data.getGroupName ()), Permission.SUB) For (SubscriptionData subscriptionData: data.getSubscriptionDataSet ()) {accessResource.addResourceAndPerm (subscriptionData.getTopic (), Permission.SUB);} break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader (UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm (getRetryTopic (unregisterClientRequestHeader.getConsumerGroup ()), Permission.SUB); break Case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeade r (GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm (getRetryTopic (getConsumerListByGroupRequestHeader.getConsumerGroup ()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader (UpdateConsumerOffsetRequestHeader.class) AccessResource.addResourceAndPerm (getRetryTopic (updateConsumerOffsetRequestHeader.getConsumerGroup ()), Permission.SUB); accessResource.addResourceAndPerm (updateConsumerOffsetRequestHeader.getTopic (), Permission.SUB); break; default: break;}

Obtain the set of permission identifiers required by the current operation according to request.getCode (), which can be used later when checking with the permission identifiers in the system's permission profile plain_acl.yml.

2.3.The PlainPermissionLoader resource loader

Broker creates PlainAccessValidator when initializing related services, and we find that its permission resource loader PlainPermissionLoader is called in its default constructor.

Public PlainAccessValidator () {aclPlugEngine = new PlainPermissionLoader ();}

Create a PlainPermissionLoader object

Public PlainPermissionLoader () {/ / load the server's permission file plain_acl.yml load (); / / start the thread to detect whether the permission file has changed per 500ms, and execute load () to reload the permission file watch ();}

View the load method flow

Public void load () {Map plainAccessResourceMap = new HashMap (); List globalWhiteRemoteAddressStrategy = new ArrayList (); JSONObject plainAclConfData = AclUtils.getYamlDataObject (fileHome + File.separator + fileName, JSONObject.class); if (plainAclConfData = = null | | plainAclConfData.isEmpty ()) {throw new AclException (String.format ("% s file is not data", fileHome + File.separator + fileName));} log.info ("Broker plain acl conf data is:", plainAclConfData.toString ()) / / get the global whitelist IP collection JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray ("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList! = null & &! globalWhiteRemoteAddressesList.isEmpty ()) {for (int I = 0; I < globalWhiteRemoteAddressesList.size (); iTunes +) {globalWhiteRemoteAddressStrategy.add (remoteAddressStrategyFactory. GetRemoteAddressStrategy (globalWhiteRemoteAddressesList.getString (I));}} / / get account permission set JSONArray accounts = plainAclConfData.getJSONArray ("accounts"); if (accounts! = null & &! accounts.isEmpty ()) {List plainAccessConfigList = accounts.toJavaList (PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig: plainAccessConfigList) {/ / build the permission resource PlainAccessResource plainAccessResource = buildPlainAccessResource (plainAccessConfig) for each account / / put AccessKey in Map as key, and the permission resource of this account as value plainAccessResourceMap.put (plainAccessResource.getAccessKey (), plainAccessResource);}} this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap;}

Load the resource file, parse the permission identity in it, and wait for the permission verifier PlainAccessValidator to call its validate () to verify the permission.

2.4. Permission verification process

Kernel check method PlainPermissionLoader.validate ()

Public void validate (PlainAccessResource plainAccessResource) {/ / Global whitelist IP verifies for (RemoteAddressStrategy remoteAddressStrategy: globalWhiteRemoteAddressStrategy) {/ / indicates that it is a global whitelist IP with all permissions and is returned directly. If (remoteAddressStrategy.match (plainAccessResource)) {return;}} / / determines whether the user name is empty, and null throws an AclException exception if (plainAccessResource.getAccessKey () = = null) {throw new AclException (String.format ("No accessKey is configured")) } / / verify whether the account exists in the server's permission resource file plain_acl.yml. If not, an exception if (! plainAccessResourceMap.containsKey (plainAccessResource.getAccessKey () {throw new AclException (String.format ("No acl config for% s", plainAccessResource.getAccessKey ();} PlainAccessResource ownedAccess = plainAccessResourceMap.get (plainAccessResource.getAccessKey ()) is thrown. / / check whether the whitelist IP of the account matches the client IP. It has all permissions except UPDATE_AND_CREATE_TOPIC and other special permissions that require administrator permission if (ownedAccess.getRemoteAddressStrategy () .match (plainAccessResource)) {return;} / / verify signature String signature = AclUtils.calSignature (plainAccessResource.getContent (), ownedAccess.getSecretKey ()). If (! signature.equals (plainAccessResource.getSignature () {throw new AclException (String.format ("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey ();} / / verify the resource permission checkPerm (plainAccessResource, ownedAccess) in the account;}

Check its verification of resources within the current account

Void checkPerm (PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {/ / determines whether the Code of the requested command requires administrator privileges, and determines whether the user is an administrator if (Permission.needAdminPerm (needCheckedAccess.getRequestCode ()) & &! ownedAccess.isAdmin ()) {throw new AclException ("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode (), ownedAccess.getAccessKey () } Map needCheckedPermMap = needCheckedAccess.getResourcePermMap (); Map ownedPermMap = ownedAccess.getResourcePermMap (); if (needCheckedPermMap = = null) {/ / If the needCheckedPermMap is null,then return return;} for (Map.Entry needCheckedEntry: needCheckedPermMap.entrySet ()) {String resource = needCheckedEntry.getKey (); Byte neededPerm = needCheckedEntry.getValue () / / determine whether it is group. When building resourcePermMap, group's key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup boolean isGroup = PlainAccessResource.isRetryTopic (resource) / / the configuration item package in the permission configuration file of the system does not contain the permissions required by the client command request if (! ownedPermMap.containsKey (resource)) {/ / determine whether it is the permission identity of topic or group. What is the global permission of this type byte ownedPerm = isGroup? NeedCheckedAccess.getDefaultGroupPerm (): needCheckedAccess.getDefaultTopicPerm (); / / check permissions if (! Permission.checkPermission (neededPerm, ownedPerm)) {throw new AclException (String.format ("No default permission for% s", PlainAccessResource.printStr (resource, isGroup));} continue } / / if the configuration item in the permission configuration file of the system contains the permissions required by the client command request, the permission if (! Permission.checkPermission (neededPerm, ownedPermMap.get (resource) {throw new AclException (String.format ("No default permission for% s", PlainAccessResource.printStr (resource, isGroup);} is directly determined.

All inspection procedures throw an AclException exception if one item is not satisfied.

2.5. The client sends a request

The above figure only analyzes the processing flow of the Broker server. How does the client call us? let's take sending a message as an example:

We have previously analyzed the core method of Producer message sending is DefaultMQProducerImpl.sendKernelImpl () this method

/ / whether "hook" if (this.hasSendMessageHook ()) {context = new SendMessageContext (); context.setProducer (this); context.setProducerGroup (this.defaultMQProducer.getProducerGroup ()); context.setCommunicationMode (communicationMode); context.setBornHost (this.defaultMQProducer.getClientIP ()); context.setBrokerAddr (brokerAddr); context.setMessage (msg); context.setMq (mq); String isTrans = msg.getProperty (MessageConst.PROPERTY_TRANSACTION_PREPARED) If (isTrans! = null & & isTrans.equals ("true")) {context.setMsgType (MessageType.Trans_Msg_Half);} if (msg.getProperty ("_ STARTDELIVERTIME")! = null | | msg.getProperty (MessageConst.PROPERTY_DELAY_TIME_LEVEL)! = null) {context.setMsgType (MessageType.Delay_Msg);} / encapsulates the parameter information this.executeSendMessageHookBefore (context) of its ACL request;}

HasSendMessageHook (), which we created when we built the Producer and added to the sendMessageHookList property of the DefaultMQProducerImpl.

Let's take a look at the data preparation before calling AclClientRPCHook.doBeforeRequest () in its sending message NettyRemotingClient class.

Public void doBeforeRequest (String remoteAddr, RemotingCommand request) {byte [] total = AclUtils.combineRequestContent (request, parseRequestContent (request, sessionCredentials.getAccessKey (), sessionCredentials.getSecurityToken ()); String signature = AclUtils.calSignature (total, sessionCredentials.getSecretKey ()); request.addExtField (SIGNATURE, signature); request.addExtField (ACCESS_KEY, sessionCredentials.getAccessKey ()); / / The SecurityToken value is unneccessary,user can choose this one. If (sessionCredentials.getSecurityToken ()! = null) {request.addExtField (SECURITY_TOKEN, sessionCredentials.getSecurityToken ());}} so much about how to implement permission control in RocketMQ. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report