In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "the use of Schema Registry tutorial", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn the tutorial on the use of Schema Registry.
There are many kinds of Internet of things equipment terminals, and different manufacturers use different coding formats, so there is a need for a unified data format when accessing the Internet of things platform, so that the applications on the platform for device management.
EMQ X Enterprise Edition 3.4.0 provides Schema Registry functionality and codec capabilities. Schema Registry manages the Schema used for encoding and decoding, processes encoding or decoding requests, and returns results. Schema Registry works with the rule engine to adapt to device access and rule design in various scenarios.
Data format
The following figure shows an application case of Schema Registry. Multiple devices report data in different formats, which is decoded by Schema Registry, changes to a unified internal format, and then forwarded to the background application.
[figure 1: using Schema Registry to encode and decode device data]
Binary format support
The Schema Registry data formats built into EMQ X 3.4.0 include Avro and Protobuf. Avro and Protobuf are data formats that rely on Schema, and the encoded data is binary. The internal data format decoded by Schema Registry (Map, explained later) can be directly used by the rules engine and other plug-ins. In addition, Schema Registry supports user-defined (3rd-party) codec services, which can be codec closer to business needs through HTTP or TCP callback.
Architecture design
Schema Registry maintains Schema text for built-in encoding formats such as Avro and Protobuf, but for custom codec (3rd-party) formats, Schema,Schema text needs to be maintained by the codec service itself. Schema Registry creates a Schema ID,Schema API for each Schema that provides add, query, and delete operations through Schema ID.
Schema Registry can be either decoded or encoded. Schema ID needs to be specified when encoding and decoding.
[figure 2: schematic diagram of Schema Registry architecture]
Example of encoding call: parameter is Schema
Schema_encode (SchemaID, Data)-> RawData
Example of decoding call:
Schema_decode (SchemaID, RawData)-> Data
A common use case is to use the rules engine to invoke the encoding and decoding interface provided by Schema Registry, and then use the encoded or decoded data as input to subsequent actions.
Codec + rule engine
The message processing level of EMQ X can be divided into three parts: message routing (Messaging), rules engine (Rule Engine) and data format conversion (Data Conversion).
The PUB/SUB system of EMQ X routes messages to the specified topic. The rules engine can flexibly configure the business rules of the data, match messages by rules, and then specify the corresponding actions. Data format conversion occurs before the process of rule matching, converting the data to a Map format that can participate in rule matching, and then matching.
[figure 3: Messaging, Rule Engine and Schema Registry]
Rules engine internal data format (Map)
The data format used within the rules engine is Erlang Map, so if the original data content is in binary or other format, you must use codec functions (such as the schema_decode and json_decode functions mentioned above) to convert it to Map.
Map is a data structure in the form of Key-Value, like # {key = > value}. For example, user = # {id = > 1 Steve name = > "Steve"} defines a user Map with an id of 1 id and a name of "Steve".
The SQL statement provides "." Operator to extract and add Map fields nested. The following is an example of using the SQL statement for this Map operation:
SELECT user.id AS my_id
The filter result of the SQL statement is # {my_id = > 1}.
JSON codec
The SQL statement of the rules engine provides support for encoding and decoding strings in JSON format. The SQL functions that convert JSON strings and Map formats to json_decode () and json_encode ():
SELECT json_decode (payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~ = "t amp #"
The above SQL statement will match that the payload content is a JSON string: {"x" = 1, "y" = 1}, and topic is the MQTT message of tbank a.
Json_decode (payload) as p decodes the JSON string into the following Map data structure, so you can use fields in Map using p.x and p.y in the WHERE clause:
# {p = > # {x = > 1, y = > 1}}
Note: the AS clause is required, and the decoded data is assigned to a Key before subsequent operations can be performed.
Requirements of Protobuf data parsing example rules for coding and Decoding practice
The device publishes a binary message encoded in Protobuf, which needs to be matched by the rules engine and republished to a topic related to the "name" field. The format of the theme is "person/$ {name}".
For example, republish a message with a "name" field of "Shawn" to the topic "person/Shawn".
Create Schema
In the Dashboard interface of EMQ X, create a Protobuf Schema with the following parameters:
Name: protobuf_person
Codec type: protobuf
Schema: the following protobuf schema defines a Person message.
Message Person {required string name = 1; required int32 id = 2; optional string email = 3;}
When the Schema is created, the emqx assigns a Schema ID and a Version. If this is the first time to create "protobuf_person", the Schema ID is "protobuf_person:1.0".
Create a rule
Use the Schema ID you just created to write the regular SQL statement:
SELECT schema_decode ('protobuf_person:1.0', payload,' Person') as person, payloadFROM "message.publish" WHERE topic = ~ 'and person.name' Shawn'
The key point here is schema_decode ('protobuf_person:1.0', payload,' Person'):
The schema_decode function decodes the contents of the payload field according to the Schema of 'protobuf_person:1.0''
As person saves the decoded value to the variable "person"
The last parameter, Person, indicates that the type of message in payload is the 'Person' type defined in protobuf schema.
Then add the action using the following parameters:
Action type: message republish
Destination topic: person/$ {person.name}
Message content template: ${person}
This action sends the decoded "person" to the person/$ {person.name} topic in JSON format. Where ${person.name} is a variable placeholder that will be replaced at run time with the value of the "name" field in the message content.
Device side code
Once the rules are created, you can simulate the data for testing.
The following code populates a Person message in the Python language, encodes it as binary data, and then sends it to the "tbank 1" topic. See the complete code for details.
Def publish_msg (client): P = person_pb2.Person () p.id = 1 p.name = "Shawn" p.email = "liuxy@emqx.io" message = p.SerializeToString () topic = "t print 1" print ("publish to topic: t print 1, payload:", message) client.publish (topic, payload=message, qos=0, retain=False) check the result of rule execution
In Dashboard's Websocket tool, log in to a MQTT Client and subscribe to "person/#".
Install python dependencies and execute the device-side code:
$pip3 install protobuf$ pip3 install paho-mqtt$ python3. / pb2_mqtt.pyConnected with result code 0publish to topic: tBank 1, payload: b'nx05Shawnx10x01x1arliuxy@emqx.io't/1 bounded nx05Shawnx10x01x1arliuxyroomemqx.io'
Check that the Websocket side receives a message with the subject person/Shawn:
{"email": "liuxy@emqx.io", "id": 1, "name": "Shawn"} Avro data parsing example rule requirements
The device publishes a binary message encoded in Avro, which needs to be matched by the rules engine and republished to a topic related to the "name" field. The format of the theme is "avro_user/$ {name}".
For example, republish a message with a "name" field of "Shawn" to the topic "avro_user/Shawn".
Create Schema
In the Dashboard interface of EMQ X, create an Avro Schema with the following parameters:
Name: avro_user
Codec type: avro
Schema:
{"type": "record", "fields": [{"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]}]}
When the Schema is created, the emqx assigns a Schema ID and a Version. If this is the first time to create "avro_user", the Schema ID is "avro_user:1.0".
Create a rule
Use the Schema ID you just created to write the regular SQL statement:
SELECT schema_decode ('avro_user:1.0', payload) as avro_user, payloadFROM "message.publish" WHERE topic = ~ 'tUniverse' and avro_user.name = 'Shawn'
The key point here is schema_decode ('avro_user:1.0', payload):
The schema_decode function decodes the contents of the payload field according to the Schema of 'avro_user:1.0''
As avro_user saves the decoded value to the variable "avro_user".
Then add the action using the following parameters:
Action type: message republish
Destination topic: avro_user/$ {avro_user.name}
Message content template: ${avro_user}
This action sends the decoded "user" to the avro_user/$ {avro_user.name} topic in JSON format. Where ${avro_user.name} is a variable placeholder that will be replaced at run time with the value of the "name" field in the message content.
Device side code
Once the rules are created, you can simulate the data for testing.
The following code populates a User message in the Python language, encodes it as binary data, and then sends it to the "tbank 1" topic. See the complete code for details.
Def publish_msg (client): datum_w = avro.io.DatumWriter (SCHEMA) buf = io.BytesIO () encoder = avro.io.BinaryEncoder (buf) datum_w.write ({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder) message = buf.getvalue () topic = "tUnip 1" print ("publish to topic: t Shawn 1, payload:", message) client.publish (topic, payload=message) Qos=0, retain=False) check the result of rule execution
In Dashboard's Websocket tool, log in to a MQTT Client and subscribe to "avro_user/#".
Install python dependencies and execute the device-side code:
$pip3 install protobuf$ pip3 install paho-mqtt$ python3 avro_mqtt.pyConnected with result code 0publish to topic: t pip3 install protobuf$ pip3 install paho-mqtt$ python3 avro_mqtt.pyConnected with result code 0publish to topic 1, payload: bounnShawnx00xb4nx00x06red'
Check that the Websocket side receives a message with the subject avro_user/Shawn:
{"favorite_color": "red", "favorite_number": 666," name ":" Shawn "} Custom codec example rule requirements
The device issues an arbitrary message verifying that the self-deployed codec service is working properly.
Create Schema
In the Dashboard interface of EMQ X, create a 3rd-Party Schema with the following parameters:
Name: my_parser
Codec type: 3rd-party
Third party type: HTTP
URL: http://127.0.0.1:9003/parser
Codec configuration: xor
Other configurations remain the default. Emqx assigns a Schema ID "my_parser". Custom codec is not managed by Version.
Item 5 above is an optional codec configuration, which is a string, and the content is related to the business of the codec service.
Create a rule
Use the Schema ID you just created to write the regular SQL statement:
SELECT schema_encode ('my_parser', payload) as encoded_data, schema_decode (' my_parser', encoded_data) as decoded_dataFROM "message.publish" WHERE topic = ~'t Universe'
This SQL statement first Encode the data, and then Decode it to verify whether the codec process is correct:
The schema_encode function encodes the contents of the payload field according to the Schema of 'my_parser', and the result is stored in the variable encoded_data.
The schema_decode function decodes the contents of the payload field according to the Schema of 'my_parser', and the result is stored in the variable decoded_data.
In the end, the filter result of this SQL statement is encoded_data and decoded_data.
Then add the action using the following parameters:
Action type: check (debug)
This check action prints the results of the SQL statement filter to the emqx console (erlang shell).
If you start a service using emqx console, the print will be displayed directly in the console; if you use the service started by emqx start, the print will be output to the erlang.log.N file in the log directory, where "N" is an integer, such as "erlang.log.1" and "erlang.log.2".
Codec server code
Once the rules are created, you can simulate the data for testing. So first of all, you need to write your own codec service.
The following code implements a HTTP codec service in Python language. For simplicity, this service provides two simple ways to encode and decode (encrypt and decrypt). For more information, please see the complete code:
Bitwise XOR
Character substitution
Def xor (data): "" > xor (xor (baked abc') baked abc' > xor (xor (baked!} *')) baked!} *'"length = len (data) bdata = bytearray (data) bsecret = bytearray (secret * length) result = bytearray (length) for i in range (length): result [I] = bdata [I] ^ bsecret [I] return bytes (result) def subst (dtype, data) N): "> subst ('decode', baked abscesses, 3) baked defences' > subst ('decode', baked abnormalities, 1) baked bcads' > subst ('encode', baked defenses, 3) bounded abc' > subst (' encode', baked abnormalities, 1) bounded abnormalities'"adata = array.array ('B'") Data) for i in range (len (adata)): if dtype = = 'decode': adata [I] = shift (adata [I], n) elif dtype = =' encode': adata [I] = shift (adata [I],-n) return bytes (adata)
Get this service running:
Pip3 install flask$ python3 http_parser_server.py * Serving Flask app "http_parser_server" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit) check the result of rule execution
Because this example is relatively simple, we directly use the MQTT Websocket client to simulate sending a message on the device side.
In Dashboard's Websocket tool, log in to a MQTT Client and post a message to "t hello 1".
Check the printing in the emqx console (erlang shell):
(emqx@127.0.0.1) 1 > [inspect] Selected Data: # {decoded_data = >, encoded_data = >} Envs: # {event = > 'message.publish', flags = > # {dup = > false,retain = > false}, from = >, headers = > # {allow_publish = > true Peername = > {{127pc0pct 0je 1}, 54753}, username = >}, id = >, node = > 'emqx@127.0.0.1',payload = >, qos = > 0, timestamp = > {1568dpct 222929}, topic = >} Action Init Params: # {}
Select Data is the data filtered by SQL statements, Envs is the environment variable available within the rules engine, and Action Init Params is the initialization parameter of the action. All three data are in Map format.
The two fields decoded_data and encoded_data in Selected Data correspond to the two AS in the SELECT statement. Because decoded_data is the result of encoding and then decoding, it is restored to the content we sent "hello", indicating that the codec plug-in is working properly.
At this point, I believe you have a deeper understanding of the "tutorial on the use of Schema Registry". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.