Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of data bridging to message queue Kafka in EMQ X

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about the example analysis of data bridging to message queue Kafka. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something from this article.

Scene introduction

This scenario requires bridging messages under the topic specified by EMQ X and meeting the criteria to the Kafka. In order to facilitate subsequent analysis and retrieval, the content of the message needs to be split.

In this scenario, the information reported by the device end is as follows:

Report topic: cmd/state/:id, in which id represents the vehicle client identification number

Message body:

{"id": "NXP-058659730253-963945118132721-22", / / client identification code "speed": 32.12, / / vehicle speed "direction": 198.33212, / / driving direction "tachometer": 3211, / / engine speed When the value is greater than 8000, you need to store "dynamical": 8.93, / / instantaneous fuel consumption "location": {/ / GPS longitude and latitude data "lng": 116.296011, "lat": 40.005091}, "ts": 1563268202 / / reporting time}

When the reported data engine speed value is greater than 8000, the current information is stored for subsequent analysis of the user's vehicle usage.

Prepare to create a Kafka theme. / bin/kafka-topics.sh-- create-- bootstrap-server localhost:9092-- topic 'emqx_rule_engine_output'-- partitions 1-- replication-factor 1

The theme must be created in the Kafka before creating the Kafka Rule, otherwise the creation of the Kafka Rule fails.

Configuration instructions to create resources

Open EMQ X Dashboard, go to the resources page of the left menu, click the New button, and type the Kafka server information to create resources.

The network environment of the nodes in the EMQ X cluster may be different from each other. Click the status button in the list after resource creation to view the connection status of each node resource. If the resources on the node are not available, check whether the configuration and network connectivity are correct, and click the reconnect button to reconnect manually.

Create a rule

Go to the rules page of the left menu and click the New button to create the rules. Here, we choose to trigger the event message release, and trigger the rule for data processing when the message is released.

After the trigger event is selected, we can see the optional fields and sample SQL on the interface:

Filter the required fields

The rules engine uses SQL statements to process / integer terminal messages or connection events, and so on. In this business, we only need to filter out the key fields in the payload for use. You can use payload. Format to select fields in payload. In addition, in addition to the content in the payload, you also need to save the id information of the message. Then SQL can be configured in the following format:

SELECT payload.id as client_id, payload.speed as speed, payload.tachometer as tachometer, payload.ts as ts, idFROM "message.publish" WHERE topic = ~ 'tUniverse'to establish the screening conditions

Use the SQL statement WHERE sentence for conditional filtering. In this business, we need to define two conditions:

Only deal with cmd/state/:id topics and filter topic with the topic wildcard = ~: topic = ~ 'cmd/state/+'

Only messages with tachometer > 8000 are processed and tachometer is filtered using comparators: payload.tachometer > 8000

The SQL obtained from the previous step is as follows:

SELECT payload.id as client_id, payload.speed as speed, payload.tachometer as tachometer, payload.ts as ts, idFROM "message.publish" WHERE topic = ~ 'cmd/state/+' AND payload.tachometer > 8000 use the SQL test function for output testing

With the SQL testing function, we can view the current SQL processed data output in real time, which requires us to specify payload and other simulation raw data.

The payload data is as follows. Be careful to change the size of the tachometer value to meet the SQL condition:

{"id": "NXP-058659730253-963945118132721-22", "speed": 32.12, "direction": 198.33212, "tachometer": 9001, "dynamical": 8.93, "location": {"lng": 116.296011, "lat": 40.005091}, "ts": 1563268202}

Click the SQL test switch button, change topic and payload to the information in the scenario, and click the test button to view the data output:

The test output data is:

{"client_id": "NXP-058659730253-963945118132721-22", "id": "589A429E9572FB44B0000057C0001", "speed": 32.12, "tachometer": 9001, "ts": 1563268202}

The test output is as expected, and we can take the next steps.

Add response action to bridge messages to Kafka

After the SQL conditional input and output is correct, we continue to add the corresponding actions, configure to write the SQL statement, and bridge the filter results to Kafka.

Click the add button in the response action, select the bridge data to the Kafka action, select the resource just selected, and fill in the emqx_rule_engine_output created above in the Kafka topic.

Test expected results

We successfully created a rule that contains a processing action, and the desired effect of the action is as follows:

The device reports a message to the cmd/state/:id topic that when the tachometer value in the message exceeds 8000, the SQL will be hit, and the hit number in the rule list will be increased by 1.

The emqx_rule_engine_output topic of Kafka will add a message with the same value as the current message.

Test using the Websocket tool in Dashboard

Switch to the tool-> Websocket page, use any information client to connect to EMQ X, and send the following information on the message card after the connection is successful:

Topic: cmd/state/NXP-058659730253-963945118132721-22

Message body:

{"id": "NXP-058659730253-963945118132721-22", "speed": 32.12, "direction": 198.33212, "tachometer": 198.33212, "dynamical": 8.93, "location": {"lng": 116.296011, "lat": 40.005091}, "ts": 1563268202}

Click the send button to see that the statistical value of the current rule hit is 1.

Then use the Kafka command to check whether the message was produced successfully:

/ bin/kafka-console-consumer.sh-- bootstrap-server localhost:9092-- topic emqx_rule_engine_output-- from-beginning {"client_id": "NXP-058659730253-963945118132721-22", "id": "58DEE9D97711EF440000017B30002", "speed": 32.12, "tachometer": 8081, "ts": 1563268202}

So far, we have implemented the business development that uses the rule engine to bridge messages to Kafka through the rule engine.

The open source rule engine only supports forwarding to Web Server, and forwarding to Kafka is only released in the enterprise version.

After reading the above, do you have any further understanding of the example analysis of data bridging to message queuing Kafka? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report