Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

A tutorial on using Schema Registry

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "the use of Schema Registry tutorial", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn the tutorial on the use of Schema Registry.

There are many kinds of Internet of things equipment terminals, and different manufacturers use different coding formats, so there is a need for a unified data format when accessing the Internet of things platform, so that the applications on the platform for device management.

EMQ X Enterprise Edition 3.4.0 provides Schema Registry functionality and codec capabilities. Schema Registry manages the Schema used for encoding and decoding, processes encoding or decoding requests, and returns results. Schema Registry works with the rule engine to adapt to device access and rule design in various scenarios.

Data format

The following figure shows an application case of Schema Registry. Multiple devices report data in different formats, which is decoded by Schema Registry, changes to a unified internal format, and then forwarded to the background application.

[figure 1: using Schema Registry to encode and decode device data]

Binary format support

The Schema Registry data formats built into EMQ X 3.4.0 include Avro and Protobuf. Avro and Protobuf are data formats that rely on Schema, and the encoded data is binary. The internal data format decoded by Schema Registry (Map, explained later) can be directly used by the rules engine and other plug-ins. In addition, Schema Registry supports user-defined (3rd-party) codec services, which can be codec closer to business needs through HTTP or TCP callback.

Architecture design

Schema Registry maintains Schema text for built-in encoding formats such as Avro and Protobuf, but for custom codec (3rd-party) formats, Schema,Schema text needs to be maintained by the codec service itself. Schema Registry creates a Schema ID,Schema API for each Schema that provides add, query, and delete operations through Schema ID.

Schema Registry can be either decoded or encoded. Schema ID needs to be specified when encoding and decoding.

[figure 2: schematic diagram of Schema Registry architecture]

Example of encoding call: parameter is Schema

Schema_encode (SchemaID, Data)-> RawData

Example of decoding call:

Schema_decode (SchemaID, RawData)-> Data

A common use case is to use the rules engine to invoke the encoding and decoding interface provided by Schema Registry, and then use the encoded or decoded data as input to subsequent actions.

Codec + rule engine

The message processing level of EMQ X can be divided into three parts: message routing (Messaging), rules engine (Rule Engine) and data format conversion (Data Conversion).

The PUB/SUB system of EMQ X routes messages to the specified topic. The rules engine can flexibly configure the business rules of the data, match messages by rules, and then specify the corresponding actions. Data format conversion occurs before the process of rule matching, converting the data to a Map format that can participate in rule matching, and then matching.

[figure 3: Messaging, Rule Engine and Schema Registry]

Rules engine internal data format (Map)

The data format used within the rules engine is Erlang Map, so if the original data content is in binary or other format, you must use codec functions (such as the schema_decode and json_decode functions mentioned above) to convert it to Map.

Map is a data structure in the form of Key-Value, like # {key = > value}. For example, user = # {id = > 1 Steve name = > "Steve"} defines a user Map with an id of 1 id and a name of "Steve".

The SQL statement provides "." Operator to extract and add Map fields nested. The following is an example of using the SQL statement for this Map operation:

SELECT user.id AS my_id

The filter result of the SQL statement is # {my_id = > 1}.

JSON codec

The SQL statement of the rules engine provides support for encoding and decoding strings in JSON format. The SQL functions that convert JSON strings and Map formats to json_decode () and json_encode ():

SELECT json_decode (payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~ = "t amp #"

The above SQL statement will match that the payload content is a JSON string: {"x" = 1, "y" = 1}, and topic is the MQTT message of tbank a.

Json_decode (payload) as p decodes the JSON string into the following Map data structure, so you can use fields in Map using p.x and p.y in the WHERE clause:

# {p = > # {x = > 1, y = > 1}}

Note: the AS clause is required, and the decoded data is assigned to a Key before subsequent operations can be performed.

Requirements of Protobuf data parsing example rules for coding and Decoding practice

The device publishes a binary message encoded in Protobuf, which needs to be matched by the rules engine and republished to a topic related to the "name" field. The format of the theme is "person/$ {name}".

For example, republish a message with a "name" field of "Shawn" to the topic "person/Shawn".

Create Schema

In the Dashboard interface of EMQ X, create a Protobuf Schema with the following parameters:

Name: protobuf_person

Codec type: protobuf

Schema: the following protobuf schema defines a Person message.

Message Person {required string name = 1; required int32 id = 2; optional string email = 3;}

When the Schema is created, the emqx assigns a Schema ID and a Version. If this is the first time to create "protobuf_person", the Schema ID is "protobuf_person:1.0".

Create a rule

Use the Schema ID you just created to write the regular SQL statement:

SELECT schema_decode ('protobuf_person:1.0', payload,' Person') as person, payloadFROM "message.publish" WHERE topic = ~ 'and person.name' Shawn'

The key point here is schema_decode ('protobuf_person:1.0', payload,' Person'):

The schema_decode function decodes the contents of the payload field according to the Schema of 'protobuf_person:1.0''

As person saves the decoded value to the variable "person"

The last parameter, Person, indicates that the type of message in payload is the 'Person' type defined in protobuf schema.

Then add the action using the following parameters:

Action type: message republish

Destination topic: person/$ {person.name}

Message content template: ${person}

This action sends the decoded "person" to the person/$ {person.name} topic in JSON format. Where ${person.name} is a variable placeholder that will be replaced at run time with the value of the "name" field in the message content.

Device side code

Once the rules are created, you can simulate the data for testing.

The following code populates a Person message in the Python language, encodes it as binary data, and then sends it to the "tbank 1" topic. See the complete code for details.

Def publish_msg (client): P = person_pb2.Person () p.id = 1 p.name = "Shawn" p.email = "liuxy@emqx.io" message = p.SerializeToString () topic = "t print 1" print ("publish to topic: t print 1, payload:", message) client.publish (topic, payload=message, qos=0, retain=False) check the result of rule execution

In Dashboard's Websocket tool, log in to a MQTT Client and subscribe to "person/#".

Install python dependencies and execute the device-side code:

$pip3 install protobuf$ pip3 install paho-mqtt$ python3. / pb2_mqtt.pyConnected with result code 0publish to topic: tBank 1, payload: b'nx05Shawnx10x01x1arliuxy@emqx.io't/1 bounded nx05Shawnx10x01x1arliuxyroomemqx.io'

Check that the Websocket side receives a message with the subject person/Shawn:

{"email": "liuxy@emqx.io", "id": 1, "name": "Shawn"} Avro data parsing example rule requirements

The device publishes a binary message encoded in Avro, which needs to be matched by the rules engine and republished to a topic related to the "name" field. The format of the theme is "avro_user/$ {name}".

For example, republish a message with a "name" field of "Shawn" to the topic "avro_user/Shawn".

Create Schema

In the Dashboard interface of EMQ X, create an Avro Schema with the following parameters:

Name: avro_user

Codec type: avro

Schema:

{"type": "record", "fields": [{"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]}]}

When the Schema is created, the emqx assigns a Schema ID and a Version. If this is the first time to create "avro_user", the Schema ID is "avro_user:1.0".

Create a rule

Use the Schema ID you just created to write the regular SQL statement:

SELECT schema_decode ('avro_user:1.0', payload) as avro_user, payloadFROM "message.publish" WHERE topic = ~ 'tUniverse' and avro_user.name = 'Shawn'

The key point here is schema_decode ('avro_user:1.0', payload):

The schema_decode function decodes the contents of the payload field according to the Schema of 'avro_user:1.0''

As avro_user saves the decoded value to the variable "avro_user".

Then add the action using the following parameters:

Action type: message republish

Destination topic: avro_user/$ {avro_user.name}

Message content template: ${avro_user}

This action sends the decoded "user" to the avro_user/$ {avro_user.name} topic in JSON format. Where ${avro_user.name} is a variable placeholder that will be replaced at run time with the value of the "name" field in the message content.

Device side code

Once the rules are created, you can simulate the data for testing.

The following code populates a User message in the Python language, encodes it as binary data, and then sends it to the "tbank 1" topic. See the complete code for details.

Def publish_msg (client): datum_w = avro.io.DatumWriter (SCHEMA) buf = io.BytesIO () encoder = avro.io.BinaryEncoder (buf) datum_w.write ({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder) message = buf.getvalue () topic = "tUnip 1" print ("publish to topic: t Shawn 1, payload:", message) client.publish (topic, payload=message) Qos=0, retain=False) check the result of rule execution

In Dashboard's Websocket tool, log in to a MQTT Client and subscribe to "avro_user/#".

Install python dependencies and execute the device-side code:

$pip3 install protobuf$ pip3 install paho-mqtt$ python3 avro_mqtt.pyConnected with result code 0publish to topic: t pip3 install protobuf$ pip3 install paho-mqtt$ python3 avro_mqtt.pyConnected with result code 0publish to topic 1, payload: bounnShawnx00xb4nx00x06red'

Check that the Websocket side receives a message with the subject avro_user/Shawn:

{"favorite_color": "red", "favorite_number": 666," name ":" Shawn "} Custom codec example rule requirements

The device issues an arbitrary message verifying that the self-deployed codec service is working properly.

Create Schema

In the Dashboard interface of EMQ X, create a 3rd-Party Schema with the following parameters:

Name: my_parser

Codec type: 3rd-party

Third party type: HTTP

URL: http://127.0.0.1:9003/parser

Codec configuration: xor

Other configurations remain the default. Emqx assigns a Schema ID "my_parser". Custom codec is not managed by Version.

Item 5 above is an optional codec configuration, which is a string, and the content is related to the business of the codec service.

Create a rule

Use the Schema ID you just created to write the regular SQL statement:

SELECT schema_encode ('my_parser', payload) as encoded_data, schema_decode (' my_parser', encoded_data) as decoded_dataFROM "message.publish" WHERE topic = ~'t Universe'

This SQL statement first Encode the data, and then Decode it to verify whether the codec process is correct:

The schema_encode function encodes the contents of the payload field according to the Schema of 'my_parser', and the result is stored in the variable encoded_data.

The schema_decode function decodes the contents of the payload field according to the Schema of 'my_parser', and the result is stored in the variable decoded_data.

In the end, the filter result of this SQL statement is encoded_data and decoded_data.

Then add the action using the following parameters:

Action type: check (debug)

This check action prints the results of the SQL statement filter to the emqx console (erlang shell).

If you start a service using emqx console, the print will be displayed directly in the console; if you use the service started by emqx start, the print will be output to the erlang.log.N file in the log directory, where "N" is an integer, such as "erlang.log.1" and "erlang.log.2".

Codec server code

Once the rules are created, you can simulate the data for testing. So first of all, you need to write your own codec service.

The following code implements a HTTP codec service in Python language. For simplicity, this service provides two simple ways to encode and decode (encrypt and decrypt). For more information, please see the complete code:

Bitwise XOR

Character substitution

Def xor (data): "" > xor (xor (baked abc') baked abc' > xor (xor (baked!} *')) baked!} *'"length = len (data) bdata = bytearray (data) bsecret = bytearray (secret * length) result = bytearray (length) for i in range (length): result [I] = bdata [I] ^ bsecret [I] return bytes (result) def subst (dtype, data) N): "> subst ('decode', baked abscesses, 3) baked defences' > subst ('decode', baked abnormalities, 1) baked bcads' > subst ('encode', baked defenses, 3) bounded abc' > subst (' encode', baked abnormalities, 1) bounded abnormalities'"adata = array.array ('B'") Data) for i in range (len (adata)): if dtype = = 'decode': adata [I] = shift (adata [I], n) elif dtype = =' encode': adata [I] = shift (adata [I],-n) return bytes (adata)

Get this service running:

Pip3 install flask$ python3 http_parser_server.py * Serving Flask app "http_parser_server" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit) check the result of rule execution

Because this example is relatively simple, we directly use the MQTT Websocket client to simulate sending a message on the device side.

In Dashboard's Websocket tool, log in to a MQTT Client and post a message to "t hello 1".

Check the printing in the emqx console (erlang shell):

(emqx@127.0.0.1) 1 > [inspect] Selected Data: # {decoded_data = >, encoded_data = >} Envs: # {event = > 'message.publish', flags = > # {dup = > false,retain = > false}, from = >, headers = > # {allow_publish = > true Peername = > {{127pc0pct 0je 1}, 54753}, username = >}, id = >, node = > 'emqx@127.0.0.1',payload = >, qos = > 0, timestamp = > {1568dpct 222929}, topic = >} Action Init Params: # {}

Select Data is the data filtered by SQL statements, Envs is the environment variable available within the rules engine, and Action Init Params is the initialization parameter of the action. All three data are in Map format.

The two fields decoded_data and encoded_data in Selected Data correspond to the two AS in the SELECT statement. Because decoded_data is the result of encoding and then decoding, it is restored to the content we sent "hello", indicating that the codec plug-in is working properly.

At this point, I believe you have a deeper understanding of the "tutorial on the use of Schema Registry". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report