Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement EMQ X Redis data persistence

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, the editor will share with you the relevant knowledge points about how to achieve EMQ X Redis data persistence. The content is detailed and the logic is clear. I believe most people still know too much about this knowledge, so share this article for your reference. I hope you can get something after reading this article. Let's take a look.

Introduction to EMQ X data persistence

The main usage scenarios of data persistence include recording the online and offline status of the client, subscribing to topic information, message content, and sending message receipts after the message arrives in various databases such as Redis, MySQL, PostgreSQL, MongoDB, Cassandra, AWS DynamoDB, etc., for quick query by external services or to retain the current running state in the event of service downtime / client abnormal offline, and restore to the previous state when the connection is restored. Persistence can also be used for client proxy subscription. When the device client is online, the persistence module loads the preset topics directly from the database and completes the proxy subscription, which reduces the complexity of system design and reduces the communication overhead of client subscription.

Users can also achieve similar functions by subscribing to related topics, but the built-in persistence support in the enterprise version is more efficient and reliable, which greatly reduces the developer's workload and improves the stability of the system.

Data persistence is an important feature of EMQ X and is only supported in the Enterprise Edition.

Persistent design

The principle of persistence is to call the processing function (action) when the event hook is triggered, and the processing function obtains the corresponding data and processes it according to the configured instructions to add, delete, modify and query the data. The parameters available for the same event hook are the same in different databases, but the handling function (action) varies depending on the database characteristics. The whole persistence work mode and process are as follows:

One-to-one message storage

The Publish side publishes a message

Backend records messages in the database

Subscribe-side subscription topics

Backend gets the message for this topic from the database

Send a message to the Subscribe

Backend removes the message from the database after confirmation by the Subscribe side

One-to-many message storage

The PUB side publishes a message

Backend records messages in the database

SUB1 and SUB2 subscription topics

Backend gets the message for this topic from the database

Send messages to SUB1 and SUB2

Backend records the location of SUB1 and SUB2 read messages, and the next time you get a message, it starts there.

Redis data persistence

This article uses a practical example to illustrate how to store relevant information through Redis.

Redis is a high-performance key-value database that is completely open source and complies with the BSD protocol for free.

Compared with other key-value caching products, Redis has the following characteristics:

Redis has extremely high performance and supports a read and write speed of 100, 000 on a single machine.

Redis supports data persistence. You can save the data in memory on disk and load it again when you restart it.

Redis not only supports simple key-value type data, but also provides storage of data structures such as list,set,zset,hash.

Redis supports data backup, that is, data backup in master-slave mode.

Readers can refer to Redis's official Quick Start to install Redis (at the time of this writing, Redis version 5.0) and use the redis-server command to start the Redis server.

Configure the EMQ X server

The configuration files related to EMQ X Magi Redis installed through RPM are located in / etc/emqx/plugins/emqx_backend_redis.conf. If you are just testing the functionality of Redis persistence, most of the configuration does not need to be changed. The only thing that needs to be changed may be the address of the Redis server: if the Redis installed by the reader is not on the same server as EMQ X, specify the address and port of the correct Redis server. As follows:

# # Redis Server 127.0.0.1 127.0.0.1:26379backend.redis.pool1.server 6379, Redis Sentinel: 127.0.0.1:26379backend.redis.pool1.server = 127.0.0.1

Leave the rest of the configuration file unchanged, and then start the plug-in:

Emqx_ctl plugins load emqx_backend_redis client presence Store

When the client goes online and offline, update the online status, the online and offline time, and the list of node clients to the Redis database.

Although EMQ X itself provides the device presence API, it is more efficient to get the record directly from the database than to call EMQ X API in scenarios that require frequent access to client presence and online and offline time.

Configuration item

Open the configuration file and configure the Backend rules:

# # online backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"} # # offline backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"} usage example

Open the http://127.0.0.1:18083 EMQ X management console in the browser, create a new client connection in tools-> Websocket, and specify clientid as sub_client:

Open the redis-cli command line window and execute the command keys *. The result is as follows, and the reader can see that two key are stored in Redis:

127.0.0.1 mqtt:client:sub_client 6379 > keys * 1) "mqtt:node:emqx@127.0.0.1" 2) list of "mqtt:client:sub_client" connections

The plug-in records the client list and connection timestamp information under the node in key in the format of mqtt:node: {node_name}. Equivalent operation:

# # redis key is mqtt:node: {node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

Field description:

# # Node online device information 127.0.0.1 clientid2 6379 > HGETALL mqtt:node:emqx@127.0.0.11) "sub_client1" # clientid2) "1542272836" # launch time stamp 3) "sub_client" 4) "1542272836" connection details

The plug-in records the client's online status and online time in key in the format of mqtt:client: {client_id}. Equivalent operation:

# # redis key is mqtt:client: {client_id} HMSET mqtt:client:sub_client state 1 online_at 1542272854

Field description:

# # client online status 127.0.0.1 online_at 6379 > HGETALL mqtt:client:sub_client1) "state" 2) "0" # 0 offline 1 online 3) "online_at" 4) "1542272854" # online timestamp 5) "offline_at" 6) "undefined" # offline timestamp client proxy subscription

When the client is online, the storage module reads the preset subscription list directly from the database, and the agent loads the subscription topic. In the scenario where the client needs to communicate through a predetermined topic (receiving messages), the application can set / change the proxy subscription list at the data level.

Configuration item

Open the configuration file and configure the Backend rules:

# # hook: client.connected## action/function: on_subscribe_lookupbackend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"} use example

When a sub_client device comes online, you need to subscribe to two QoS 1 topics, sub_client/upstream and sub_client/downlink:

The plug-in initializes the agent subscription Hash in Redis in the mqtt:sub: {client_id} format key:

# # redis key is mqtt:sub: {client_id} # # HSET key {topic} {qos} 127.0.0.1 HSET key 6379 > HSET mqtt:sub:sub_client sub_client/upstream 1 (integer) 0127.0.1 client_id > HSET mqtt:sub:sub_client sub_client/downlink 1 (integer) 0

On the WebSocket page of the EMQ X management console, create a new client connection with clientid sub_client, and switch to the subscription page. You can see that the current client automatically subscribes to two QoS 1 topics, sub_client/upstream and sub_client/downlink:

Switch back to the administrative console WebSocket page, publish messages to sub_client/downlink topics, and receive published messages in the message subscription list.

Persistent publish message configuration item

Open the configuration file, configure Backend rules, and support message filtering using the topic parameter, where the # wildcard character is used to store messages on any topic:

# # hook: message.publish## action/function: on_message_publishbackend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"} use examples

In the EMQ X Management console WebSocket page, use clientid sub_client to establish a connection and publish multiple messages to the topic upstream_topic. For each message, EMQ X persists two records of message list and message details.

Message list

EMQ X persists the message list as message id to the mqtt:msg: {topic} Redis collection:

# # get details of all message id127.0.0.1:6379 > ZRANGE mqtt:msg:upstream_topic 0-11) "2VFsyhDm0cPIQvnY9osj" 2) "2VFszTClyjpVtLDLrn1u" 3) "2VFszozkwkYOcbEy8QN9" 4) "2VFszpEc7DfbEqC97I3g" 5) "2VFszpSzRviADmcOeuXd" 6) "2VFszpm3kvvLkJTcdmGU" 7) "2VFt0kuNrOktefX6m4nP" 127.0.0.1 ZRANGE mqtt:msg:upstream_topic 6379 > messages in the topic collection of 2VFszpm3kvvLkJTcdmGU

The details of each message will be stored in Redis Hash in key in the format mqtt:msg: {message_id}:

# # get the message details 127.0.0.1 HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 6379 > HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" # # message id 3) "from" 4) "sub_client" # # client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{" cmd ":" reboot "}" 11) " Ts "12)" 1542338754 "# # pub timestamp 13)" retain "14)" false "get offline message configuration items

Open the configuration file and configure the Backend rules:

# # hook: session.subscribed## action/function: on_message_fetch_for_queue, on_message_fetch_for_pubsub## one-to-one offline messages backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"} "pool": "pool1"} # one to many offline messages backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"} usage example

MQTT offline messages must meet the following conditions:

Connect with clean_session = false

Subscribe to QoS > 0

Publish QoS > 0

Establish a connection in the EMQ X Management console with the following configuration

Persistent Retain message configuration item

Open the configuration file and configure the Backend rules:

# # hook: message.publish## action/function: on_client_connected, on_message_retainbackend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"} "pool": "pool1"} message list

EMQ X persists the message list as message id to mqtt:retain: {topic} Redis Hash:

# # get details of all message id127.0.0.1:6379 > ZRANGE mqtt:retain:upstream_topic 0-11) "2VFsyhDm0cPIQvnY9osj" 127.0.0.1 ZRANGE mqtt:retain:upstream_topic 6379 > messages in the topic collection of upstream_topic

The details of each message will be stored in Redis Hash in key in the format mqtt:msg: {message_id}:

# # get the message details 127.0.0.1 HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 6379 > HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" # # message id 3) "from" 4) "sub_client" # # client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{" cmd ":" reboot "}" 11) " Ts "12)" 1542338754 "# # pub timestamp 13)" retain "14)" false "above is all the content of the article" how to achieve EMQ X Redis data persistence " Thank you for reading! I believe you will gain a lot after reading this article. The editor will update different knowledge for you every day. If you want to learn more knowledge, please pay attention to the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report