Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use SQL to read Kafka and write to MySQL

2025-04-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how to use SQL to read Kafka and write MySQL. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.

The realization of SqlSubmit

At first, I wanted to use SQL Client to run through the demonstration, but unfortunately, version 1.9 of SQL CLI does not support processing CREATE TABLE statements. So I had to write a simple submission script by myself. Later, when I think about it, it's good to let the audience know how to use Flink SQL both through SQL and programmatically.

The main task of SqlSubmit is to execute and submit an SQL file, and the implementation is very simple, which is to match each statement block through regular expressions. If it starts with CREATE TABLE or INSERT INTO, tEnv.sqlUpdate (...) is called. If it starts with SET, the configuration is set to TableConfig. The core code is as follows:

EnvironmentSettings settings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build (); / / create a TableEnvironment that uses BlinkPlanner and work in stream mode TableEnvironment tEnv = TableEnvironment.create (settings); / / read the SQL file List sql = Files.readAllLines (path); / / distinguish different SQL statements List calls = SqlCommandParser.parse (sql) by matching prefixes with regular expressions / according to different SQL statements, call TableEnvironment to execute for (SqlCommandCall call: calls) {switch (call.command) {case SET: String key = call.operands [0]; String value = call.operands [1]; / / set the parameter tEnv.getConfig (). GetConfiguration (). SetString (key, value); break; case CREATE_TABLE: String ddl = call.operands [0]; tEnv.sqlUpdate (ddl) Break; case INSERT_INTO: String dml = call.operands [0]; tEnv.sqlUpdate (dml); break; default: throw new RuntimeException ("Unsupported command:" + call.command);}} / / submit job tEnv.execute ("SQL Job"); connect to Kafka source table using DDL

In the flink-sql-submit project, we have prepared a test data set (from the Ariyun Tianchi public data set, special thanks), located at src/main/resources/user_behavior.log. The data is encoded in JSON format, which looks something like this:

{"user_id": "543462", "item_id": "1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id": "2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

In order to simulate the real Kafka data source, the author also specially wrote a source-generator.sh script (interested can see the source code), will automatically read user_behavior.log data and the default rate of 1 per millisecond into the Kafka user_behavior topic.

Once we have the data source, we can use DDL to create and connect the topic in this Kafka (see src/main/resources/q1.sql for details).

CREATE TABLE user_log (user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP) WITH ('connector.type' =' kafka',-- use kafka connector 'connector.version' =' universal',-- kafka version Universal supports versions above 0.11 that are 'connector.topic' =' user_behavior',-- kafka topic' connector.startup-mode' = 'earliest-offset',-- read' connector.properties.0.key' = 'zookeeper.connect',-- connection information' connector.properties.0.value' = 'localhost:2181',' connector.properties.1.key' = 'bootstrap.servers' from the beginning offset 'connector.properties.1.value' =' localhost:9092', 'update-mode' =' append', 'format.type' =' json',-- data source format is json' format.derive-schema' = 'true'-- json parsing rules are determined from DDL schema)

Note: some users may find the parameters such as connector.properties.0.key strange, and the community plan will improve and simplify the parameter configuration of connector in the next release.

Connect the MySQL result table using DDL

The JDBC connector provided by Flink can be used to connect to MySQL. For example

CREATE TABLE pvuv_sink (dt VARCHAR, pv BIGINT, uv BIGINT) WITH ('connector.type' =' jdbc',-- use jdbc connector 'connector.url' =' jdbc:mysql://localhost:3306/flink-test',-- jdbc url' connector.table' = 'pvuv_sink',-- Table name' connector.username' = 'root',-- user name' connector.password' = '123456' -- password 'connector.write.flush.max-rows' =' 1'-default is 5000 To change the demonstration to 1) PV UV calculation

Suppose our requirement is to calculate the number of user visits per hour, and the number of independent users. Many users may think of using scrolling windows to calculate. But here we introduce another way. That is, the Group Aggregation way.

INSERT INTO pvuv_sinkSELECT DATE_FORMAT (ts, 'yyyy-MM-dd HH:00') dt, COUNT (*) AS pv, COUNT (DISTINCT user_id) AS uvFROM user_logGROUP BY DATE_FORMAT (ts,' yyyy-MM-dd HH:00')

It uses the built-in function DATE_FORMAT to normalize the log time into the string format of "year, month, day and hour", and group according to this string, that is, according to the hourly grouping, and then calculate the number of user visits (PV) through COUNT (*) and the number of independent users (UV) through COUNT (DISTINCT user_id). The execution mode in this way is that every time a piece of data is received, an incremental calculation is made based on the previously calculated value (for example, + 1), and then the latest result is output. So the real-time performance is very high, but the output is also large.

We write the results of this query to the previously defined pvuv_sink MySQL table through the INSERT INTO statement.

Note: in Shenzhen Meetup, we have made an in-depth introduction to the performance tuning of this kind of query.

Preparation of actual combat demonstration environment

Some necessary services need to be installed in this practical demonstration, including:

Flink local cluster: used to run Flink SQL tasks.

Kafka local cluster: used as a data source.

MySQL database: used as a result table.

Flink local cluster installation

1. Download the Flink 1.9.0 installation package and extract it: https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz

two。 Download the following dependent jar package and copy it to the flink-1.9.0/lib/ directory. Because we need to rely on various connector implementations at run time.

Flink-sql-connector-kafka_2.11-1.9.0.jar

Http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar

Flink-json-1.9.0-sql-jar.jar

Http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar

Flink-jdbc_2.11-1.9.0.jar

Http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar

Mysql-connector-java-5.1.48.jar

Https://dev.mysql.com/downloads/connector/j/5.1.html

3. Change the taskmanager.numberOfTaskSlots in flink-1.9.0/conf/flink-conf.yaml to 10, because our demo task may consume more than 1 slot.

4. Execute. / bin/start-cluster.sh under the flink-1.9.0 directory to start the cluster.

If the operation is successful, you can access Flink Web UI at http://localhost:8081.

In addition, you also need to fill the installation path of Flink into the env.sh of the flink-sql-submit project for later submission of SQL tasks, such as my path is

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0Kafka local cluster installation

Download the Kafka 2.2.0 installation package and extract it: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

Fill the installation path into the env.sh of the flink-sql-submit project, such as my path is

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

Run. / start-kafka.sh under the flink-sql-submit directory to start the Kafka cluster.

Execute jps on the command line, and if you see the Kafka process and the QuorumPeerMain process, it indicates that the startup was successful.

MySQL installation

You can download MySQL and install it on the official page:

Https://dev.mysql.com/downloads/mysql/

If you have a Docker environment, you can also install it directly through Docker.

Https://hub.docker.com/_/mysql

$docker pull mysql$ docker run-- name mysqldb-p 3306 mysql 3306-e MYSQL_ROOT_PASSWORD=123456-d mysql

Then create a database of flink-test in MySQL and create the pvuv_sink table as shown in schema above.

Submit SQL task

Running. / source-generator.sh under the flink-sql-submit directory automatically creates a user_behavior topic and pours data into it in real time.

Run. / run.sh Q1 under the flink-sql-submit directory, and after the submission is successful, you can see the topology in Web UI.

On the MySQL client, we can also see the pv uv value constantly changing from hour to hour in real time.

After reading the above, do you have any further understanding of how to use SQL to read Kafka and write MySQL? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 210

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report