Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to install and use RocketMQ

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article is about how to install and use RocketMQ. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Server installation and deployment

I deployed in the CentOS6.5 in the virtual machine.

1. Download program

2.tar-xvf alibaba-rocketmq-3.0.7.tar.gz extract to the appropriate directory such as / opt/ directory

3. Start RocketMQ: go to the rocketmq/bin directory for execution

Nohup sh mqnamesrv &

4. Start Broker and set the corresponding NameServer

Nohup sh mqbroker-n "127.0.0.1 9876" &

Second, write client

You can view the quickstart source code 1.Consumer message consumers in sameple

/ * Consumer, subscription message * / public class Consumer {public static void main (String [] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("QuickStartConsumer"); consumer.setNamesrvAddr ("127.0.0.1 String 9876"); consumer.setInstanceName ("QuickStartConsumer"); consumer.subscribe ("QuickStart", "*") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {System.out.println (Thread.currentThread () .getName () + "Receive New Messages:" + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}); consumer.start () System.out.println ("Consumer Started.");}}

2.Producer message producer

/ * Producer, send message * * / public class Producer {public static void main (String [] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer ("QuickStartProducer"); producer.setNamesrvAddr ("127.0.0.1 String 9876"); producer.setInstanceName ("QuickStartProducer"); producer.start (); for (int I = 0; I

< 1000; i++) { try { Message msg = new Message("QuickStart",// topic "TagA",// tag ("Hello RocketMQ ,QuickStart" + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }} 3.首先运行Consumer程序,一直在运行状态接收服务器端推送过来的消息 23:18:07.587 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 1623:18:07.591 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 723:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.ByteBuffer.cleaner: available23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes. Please check the configuration for better performance.23:18:07.595 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 51223:18:08.355 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0x8c0d4793e5820c3123:18:08.446 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.noResourceLeakDetection: falseConsumer Started. 4.再次运行Producer程序,生成消息并发送到Broker,Producer的日志冲没了,但是可以看到Broker推送到Consumer的一条消息 ConsumeMessageThread-QuickStartConsumer-3 Receive New Messages: [MessageExt [queueId=0, storeSize=150, queueOffset=244, sysFlag=0, bornTimestamp=1400772029972, bornHost=/10.162.0.7:54234, storeTimestamp=1400772016017, storeHost=/127.0.0.1:10911, msgId=0A0A0A5900002A9F0000000000063257, commitLogOffset=406103, bodyCRC=112549959, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=QuickStart, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=245, MIN_OFFSET=0}, body=29]]] 三、Consumer最佳实践 1.消费过程要做到幂等(即消费端去重) RocketMQ无法做到消息重复,所以如果业务对消息重复非常敏感,务必要在业务层面去重,有以下一些方式: (1).将消息的唯一键,可以是MsgId,也可以是消息内容中的唯一标识字段,例如订单ID,消费之前判断是否在DB或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实践过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过) msgid一定是全局唯一的标识符,但是可能会存在同样的消息有两个不同的msgid的情况(有多种原因),这种情况可能会使业务上重复,建议最好使用消息体中的唯一标识字段去重 (2).使业务层面的状态机去重 2.批量方式消费 如果业务流程支持批量方式消费,则可以很大程度上的提高吞吐量,可以通过设置Consumer的consumerMessageBatchMaxSize参数,默认是1,即一次消费一条参数 3.跳过非重要的消息 发生消息堆积时,如果消费速度一直跟不上发送速度,可以选择丢弃不重要的消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); long offset=msgs.get(0).getQueueOffset(); String maxOffset=msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET); long diff=Long.parseLong(maxOffset)-offset; if(diff>

100000) {/ / handle message accumulation return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}

As shown in the above code, when the number of messages in a queue accumulates to more than 100000, try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages.

4. Optimize the process of no-message consumption

For example, the consumption process of a message is as follows

1. Query data 1 from DB according to message

two。 Query data from DB according to message 2

3. Complex business computing

4. Insert data 3 into DB

5. Insert data 4 into DB

The consumption process of this message interacts with DB for 4 times. If calculated according to each 5ms, it takes a total of 20ms. Assuming that business computing takes 5ms, then the total time-consuming 25ms is too long. If you can optimize 4 DB interactions to 2, then the total time-consuming can be optimized to 15ms, that is, the overall performance is improved by 40%.

For DB such as Mysql, if deployed on disk, interact with DB. If the data does not hit cache, the RT of each interaction will rise in a straight line. If SSD is adopted, the rising trend of RT is obviously better than that of disk.

Individual applications may encounter this situation: in the process of offline stress test consumption, db performs very well, each time the RT is very short, but after running online for a period of time, the RT will become longer and the consumption throughput will plummet.

The main reason is that the offline pressure test time is too short, after running online for a period of time, the cache hit rate decreases, then RT will increase. It is suggested that when offline pressure testing is carried out, it is necessary to test long enough to simulate the online environment as much as possible. In the process of pressure testing, the distribution of data is also very important. If the data is different, the hit rate of cache may be completely different.

IV. Producer best practices

1. Considerations for sending messages

(1) an application uses a Topic as much as possible, the message subtype is identified by tags, and the tags can be set freely by the application. Only if tags is set for sending messages, consumers can use tags to filter messages in broker when subscribing to messages.

(2) the unique identification code of each message at the business level should be set to the keys field to facilitate the identification of message loss in the future. The server creates an index (hash index) for each message, and the application can query the content of the message through topic,key and who consumes the message. Because it is a hash index, it is important to make sure that the key is as unique as possible to avoid potential hash conflicts.

(3) if the message is sent successfully or failed, to print the message log, be sure to print the sendresult and key fields

(4) send message method, as long as no exception is thrown, it means that the message is sent successfully. But there will be multiple states for successful delivery, which is defined in sendResult

SEND_OK: message sent successfully

FLUSH_DISK_TIMEOUT: the message was sent successfully, but the server flush timed out, and the message has entered the server queue. Only when the server is down, the message will be lost.

FLUSH_SLAVE_TIMEOUT: the message was sent successfully, but the server timed out when it synchronized to Slave, and the message has entered the server queue. Only when the server goes down will the message be lost.

SLAVE_NOT_AVAILABLE: the message was sent successfully, but the slave is not available at this time, and the message has entered the server queue. Only when the server goes down will the message be lost. For applications that accurately send sequential messages, due to the limitations of sequential messages, automatic switching between master and slave may be involved, so if the status field in sendresult is not equal to SEND_OK, you should try again. For other applications, this is not necessary

(5) for applications where messages cannot be lost, there must be a message retransmission mechanism.

two。 Message sending failure handling

The send method of Producer supports internal retry. The retry logic is as follows:

(1) retry at most 3 times

(2) if the transmission fails, it will go to the next Broker

(3) the total time taken by this method does not exceed the value set by sendMsgTimeout, and defaults to 10 seconds. Therefore, if you send a message to broker itself and cause a timeout exception, you will not try again.

If the call to the send synchronization method fails, try to store the message in the db, and the background thread retries regularly to ensure that the message reaches the Broker.

Why the above db retry method is not integrated into the MQ client, but requires the application to complete it on its own, based on the following considerations:

(1) the client of MQ is designed as stateless mode, which is convenient for arbitrary horizontal expansion, and the consumption of machine resources is only cpu, memory and network.

(2) if a KV storage module is integrated within the MQ client, the data can only be reliably removed synchronously, which costs a lot of performance, so it is usually used asynchronously, and because the application closing process is not controlled by MQ operation and maintenance personnel, violent shutdown like kill-9 may often occur, resulting in data being lost when the data is not dropped in time.

(3) the reliability of the machine where the Producer resides is low, and it is generally a virtual machine, which is not suitable for storing important data. In summary, it is recommended that the retry process be controlled by the application.

3. Select oneway form to send

A RPC call, usually a procedure like this

(1) the client sends requests to the server

(2) the server processes the request

(3) the server returns a reply to the client

Therefore, the time-consuming time of an RPC is the sum of the above three steps, while some scenarios require very short time, but do not require high reliability. For example, log collection applications can be called in the form of oneway, and the oneway form can only send a request without waiting for a reply. At the client implementation level, sending a request is only the cost of an os system call, that is, writing data to the client's socket buffer. This process is usually at the microsecond level.

RocketMQ can not only push messages directly, register listeners on the consumer side to listen, but also let consumers decide to pull data on their own.

/ * PullConsumer, subscription message * / public class PullConsumer {/ / Java cache private static final Map offseTable = new HashMap (); public static void main (String [] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer ("PullConsumerGroup"); consumer.setNamesrvAddr ("127.0.0.1 new DefaultMQPullConsumer 9876"); consumer.start () / / pull the queue for subscription topics. The default queue size is 4 Set mqs = consumer.fetchSubscribeMessageQueues ("TopicTestMapBody"); for (MessageQueue mq: mqs) {System.out.println ("Consume from the queue:" + mq) SINGLE_MQ:while (true) {try {PullResult pullResult = consumer.pullBlockIfNotFound (mq, null, getMessageQueueOffset (mq), 32); List list=pullResult.getMsgFoundList (); if (listings)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report