Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the construction of real-time data warehouse based on Flink1.11 SQL

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to build a real-time data warehouse based on Flink1.11 SQL". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to build a real-time data warehouse based on Flink1.11 SQL.

Case introduction

This paper will take e-commerce business as an example to show the data processing flow of real-time data warehouse. In addition, the purpose of this paper is to explain the construction process of real-time data warehouse, so it will not involve too complex data calculation. In order to ensure the maneuverability and integrity of the case, this paper will give detailed operation steps. For demonstration purposes, all of the operations in this article are done in Flink SQL Cli.

Architecture design

The specific architecture design is shown in the figure: first, the binlog log of MySQL is parsed through canal, and the data is stored in Kafka. Then use Flink SQL to clean and associate the original data, and write the processed detail width table to kafka. The dimension table data is stored in MySQL, and the detail width table and dimension table are JOIN through Flink SQL, and the aggregated data is written to MySQL, and finally displayed visually through FineBI.

Business data preparation order form (order_info) CREATE TABLE `order_ info` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'

`consignee` varchar (100) DEFAULT NULL COMMENT 'consignee'

`consignee_ tel`varchar (20) DEFAULT NULL COMMENT 'recipient phone'

`total_ amount `decimal (10jin2) DEFAULT NULL COMMENT 'Total amount'

`order_ status` varchar (20) DEFAULT NULL COMMENT 'order status'

`user_ id` bigint (20) DEFAULT NULL COMMENT 'user id'

`payment_ Way`varchar (20) DEFAULT NULL COMMENT 'payment method'

`delivery_ address`varchar (1000) DEFAULT NULL COMMENT 'shipping address'

`order_ comment`varchar (200) DEFAULT NULL COMMENT 'order remarks'

`order No`varchar (50) DEFAULT NULL COMMENT 'order transaction number (for third-party payment)'

`trade_ body`varchar (200) DEFAULT NULL COMMENT 'order description (for third party payment)'

`create_ time`datetime DEFAULT NULL COMMENT 'creation time'

`operate_ time`datetime DEFAULT NULL COMMENT 'operation time'

`expire_ time`datetime DEFAULT NULL COMMENT 'expiration time'

`Logistics order No. 'tracking_ No`varchar (100) DEFAULT NULL COMMENT

`parent_order_ id`bigint (20) DEFAULT NULL COMMENT 'parent order number'

`img_ url`varchar (200) DEFAULT NULL COMMENT 'picture path'

`province_ id`int (20) DEFAULT NULL COMMENT 'area'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' order form'

Order details form (order_detail) CREATE TABLE `order_ detail` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'

`order_ id`bigint (20) DEFAULT NULL COMMENT 'order number'

`sku_ id` bigint (20) DEFAULT NULL COMMENT 'sku_id'

`sku_ name`varchar (200) DEFAULT NULL COMMENT 'sku name (redundant)'

`img_ url`varchar (200) DEFAULT NULL COMMENT 'picture name (redundant)'

`order_ price`decimal (10jin2) DEFAULT NULL COMMENT 'purchase price (sku price when placing an order)'

`sku_ Num`varchar (200) DEFAULT NULL COMMENT 'number of purchases'

`create_ time`datetime DEFAULT NULL COMMENT 'creation time'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' order details table'

Goods table (sku_info) CREATE TABLE `sku_ info` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'skuid (itemID)'

`spu_ id` bigint (20) DEFAULT NULL COMMENT 'spuid'

`price`decimal (10jin0) DEFAULT NULL COMMENT 'Price'

`sku_ name`varchar (200) DEFAULT NULL COMMENT 'sku name'

`sku_ specification 'varchar (2000) DEFAULT NULL COMMENT' commodity specification description'

`weight`decimal (10pm 2) DEFAULT NULL COMMENT 'weight'

`tm_ id`bigint (20) DEFAULT NULL COMMENT 'Brand (redundancy)'

`category3_ id`bigint (20) DEFAULT NULL COMMENT 'three-level classification id (redundancy)'

`The sku_default_ img`varchar (200) DEFAULT NULL COMMENT 'shows pictures by default (redundant)'

`create_ time`datetime DEFAULT NULL COMMENT 'creation time'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' Commodity list'

Commodity level 1 catalogue (base_category1) CREATE TABLE `base_ categories 1` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'

`name`varchar (10) NOT NULL COMMENT 'category name'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' level 1 classification table'

Commodity level 2 catalogue (base_category2) CREATE TABLE `base_ categories 2` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'

`name`varchar (200) NOT NULL COMMENT 'secondary classification name'

`category1_ id`bigint (20) DEFAULT NULL COMMENT 'first-level classification number'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' level 2 classification table'

Commodity level 3 catalogue (base_category3) CREATE TABLE `base_ categories 3` (

`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'

`name`varchar (200) NOT NULL COMMENT 'third-level classification name'

`category2_ id`bigint (20) DEFAULT NULL COMMENT 'second-level classification number'

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' three-level classification table'

Provincial share table (base_province) CREATE TABLE `base_ province` (

`id`int (20) DEFAULT NULL COMMENT 'id'

`name`varchar (20) DEFAULT NULL COMMENT 'province name'

`region_ id` int (20) DEFAULT NULL COMMENT 'Theater id'

`area_ code`varchar (20) DEFAULT NULL COMMENT 'administrative region bit code'

) ENGINE=InnoDB DEFAULT CHARSET=utf8

Area table (base_region) CREATE TABLE `base_ region` (

`id`int (20) NOT NULL COMMENT 'Theater id'

`region_ name`varchar (20) DEFAULT NULL COMMENT 'Theater name'

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8; data processing flow ODS layer data synchronization

For data synchronization in the ODS layer, see my other article implementing real-time incremental data synchronization based on Canal and Flink (1). Mainly use canal to parse the binlog log of MySQL, and then write it to the corresponding topic of Kafka. Due to space constraints, specific details will not be explained. The result after synchronization is shown in the following figure:

DIM layer dimension table data preparation

In this case, the dimension table is stored in MySQL, and HBase will be used to store dimension table data in actual production. We mainly use two dimension tables: area dimension table and commodity dimension table. The process is as follows:

Area dimension table

First of all, the data corresponding to the topics of mydw.base_province and mydw.base_region are extracted into MySQL, mainly using the canal-json format corresponding to the Kafka data source of Flink SQL. Note: before loading, you need to create the corresponding table in MySQL. The name of the MySQL database used in this article is dim, which is used to store dimensional table data. As follows:

-provinces

-- kafka Source

DROP TABLE IF EXISTS `ods_base_ province`

CREATE TABLE `ods_base_ province` (

`id` INT

`name` STRING

`region_ id` INT

`area_ code 'string

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.base_province'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-provinces

-- MySQL Sink

DROP TABLE IF EXISTS `base_ province`

CREATE TABLE `base_ province` (

`id` INT

`name` STRING

`region_ id` INT

`area_ code 'string

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' base_province',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-provinces

-- MySQL Sink Load Data

INSERT INTO base_province

SELECT *

FROM ods_base_province

-- region

-- kafka Source

DROP TABLE IF EXISTS `ods_base_ region`

CREATE TABLE `ods_base_ region` (

`id` INT

`region_ name` STRING

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.base_region'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- region

-- MySQL Sink

DROP TABLE IF EXISTS `base_ region`

CREATE TABLE `base_ region` (

`id` INT

`region_ name` STRING

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' base_region',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-- region

-- MySQL Sink Load Data

INSERT INTO base_region

SELECT *

FROM ods_base_region

After the above steps, the original data needed to create the dimension table has been stored in MySQL. Next, you need to create the dimension table in MySQL. We use the above two tables to create a view: dim_province as the dimension table:

-- DIM layer, area dimension table

-- create views in MySQL

DROP VIEW IF EXISTS dim_province

CREATE VIEW dim_province AS

SELECT

Bp.id AS province_id

Bp.name AS province_name

Br.id AS region_id

Br.region_name AS region_name

Bp.area_code AS area_code

FROM base_region br

JOIN base_province bp ON br.id= bp.region_id

In this way, the dimension table we need: dim_province has been created. We only need to use Flink SQL to create the data source of JDBC when we create the dimension table join, and we can use it. Similarly, we use the same method to create a product dimension table, as follows:

-- list of first-level categories

-- kafka Source

DROP TABLE IF EXISTS `ods_base_ roomy1`

CREATE TABLE `ods_base_ roomy1` (

`id` BIGINT

`name` STRING

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.base_category1'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- list of first-level categories

-- MySQL Sink

DROP TABLE IF EXISTS `base_ roomy1`

CREATE TABLE `base_ roomy1` (

`id` BIGINT

`name` STRING

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' base_category1',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-- list of first-level categories

-- MySQL Sink Load Data

INSERT INTO base_category1

SELECT *

FROM ods_base_category1

-- second-level catalog table

-- kafka Source

DROP TABLE IF EXISTS `ods_base_ roomy2`

CREATE TABLE `ods_base_ roomy2` (

`id` BIGINT

`name` STRING

`category1_ id` BIGINT

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.base_category2'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- second-level catalog table

-- MySQL Sink

DROP TABLE IF EXISTS `base_ roomy2`

CREATE TABLE `base_ roomy2` (

`id` BIGINT

`name` STRING

`category1_ id` BIGINT

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' base_category2',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-- second-level catalog table

-- MySQL Sink Load Data

INSERT INTO base_category2

SELECT *

FROM ods_base_category2

-- three-level catalog table

-- kafka Source

DROP TABLE IF EXISTS `ods_base_ roomy3`

CREATE TABLE `ods_base_ roomy3` (

`id` BIGINT

`name` STRING

`category2_ id` BIGINT

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.base_category3'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- three-level catalog table

-- MySQL Sink

DROP TABLE IF EXISTS `base_ roomy3`

CREATE TABLE `base_ roomy3` (

`id` BIGINT

`name` STRING

`category2_ id` BIGINT

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' base_category3',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-- three-level catalog table

-- MySQL Sink Load Data

INSERT INTO base_category3

SELECT *

FROM ods_base_category3

-- goods list

-- Kafka Source

DROP TABLE IF EXISTS `ods_sku_ info`

CREATE TABLE `ods_sku_ info` (

`id` BIGINT

`spu_ id` BIGINT

`price` DECIMAL (10prime0)

`sku_ name` STRING

`sku_ us` STRING

`weight` DECIMAL (10pm 2)

`tm_ id` BIGINT

`category3_ id` BIGINT

`sku_default_ img` STRING

`create_ time`TIMESTAMP (0)

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.sku_info'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- goods list

-- MySQL Sink

DROP TABLE IF EXISTS `sku_ info`

CREATE TABLE `sku_ info` (

`id` BIGINT

`spu_ id` BIGINT

`price` DECIMAL (10prime0)

`sku_ name` STRING

`sku_ us` STRING

`weight` DECIMAL (10pm 2)

`tm_ id` BIGINT

`category3_ id` BIGINT

`sku_default_ img` STRING

`create_ time`TIMESTAMP (0)

PRIMARY KEY (tm_id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' =' sku_info',-- the table in MySQL for data to be inserted

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'sink.buffer-flush.interval' =' 1s'

);

-- goods

-- MySQL Sink Load Data

INSERT INTO sku_info

SELECT *

FROM ods_sku_info

After the above steps, we can synchronize the basic data table that creates the commodity dimension table to MySQL, and we also need to create the corresponding data table in advance. Next, we use the basic table above to create a view, dim_sku_info, in the dim library of mySQL, which will be used as a dimension table for later use.

-- DIM layer, commodity maintenance table

-- create views in MySQL

CREATE VIEW dim_sku_info AS

SELECT

Si.id AS id

Si.sku_name AS sku_name

Si.category3_id AS c3_id

Si.weight AS weight

Si.tm_id AS tm_id

Si.price AS price

Si.spu_id AS spu_id

C3.name AS c3_name

C2.id AS c2_id

C2.name AS c2_name

C3.id AS c1_id

C3.name AS c1_name

FROM

(

Sku_info si

JOIN base_category3 c3 ON si.category3_id = c3.id

JOIN base_category2 c2 ON c3.category2_id = c2.id

JOIN base_category1 C1 ON c2.category1_id = c1.id

);

Now that the dimension table data we need is ready, let's start working on the data in the DWD layer.

DWD layer data processing

After the above steps, we have prepared the dimension table used. Next, we will process the original data of ODS and process it into a detail width table of DWD layer. The specific process is as follows:

-- order details

-- Kafka Source

DROP TABLE IF EXISTS `ods_order_ detail`

CREATE TABLE `ods_order_ detail` (

`id` BIGINT

`order_ id` BIGINT

`sku_ id` BIGINT

`sku_ name` STRING

`img_ url` STRING

`order_ price` DECIMAL (10pm 2)

`sku_ Num` INT

`create_ time`TIMESTAMP (0)

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.order_detail'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- order information

-- Kafka Source

DROP TABLE IF EXISTS `ods_order_ info`

CREATE TABLE `ods_order_ info` (

`id` BIGINT

`consignee` STRING

`consignee_ tel` STRING

`total_ amount `DECIMAL (10pm 2)

`order_ status` STRING

`user_ id` BIGINT

`payment_ way` STRING

`delivery_ address` STRING

`order_ comment` STRING

`out_trade_ no` STRING

`trade_ body` STRING

`create_ time`TIMESTAMP (0)

`operate_ time`TIMESTAMP (0)

`expire_ time`TIMESTAMP (0)

`tracking_ no` STRING

`parent_order_ id` BIGINT

`img_ url` STRING

`province_ id` INT

) WITH (

'connector' = 'kafka'

'topic' = 'mydw.order_info'

'properties.bootstrap.servers' = 'kms-3:9092'

'properties.group.id' = 'testGroup'

'format' = 'canal-json'

'scan.startup.mode' = 'earliest-offset'

)

-- DWD layer, payment order schedule dwd_paid_order_detail

DROP TABLE IF EXISTS dwd_paid_order_detail

CREATE TABLE dwd_paid_order_detail

(

Detail_id BIGINT

Order_id BIGINT

User_id BIGINT

Province_id INT

Sku_id BIGINT

Sku_name STRING

Sku_num INT

Order_price DECIMAL (10dint 0)

Create_time STRING

Pay_time STRING

) WITH (

'connector' = 'kafka'

'topic' = 'dwd_paid_order_detail'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- DWD layer, paid order schedule

-- load data to dwd_paid_order_detail

INSERT INTO dwd_paid_order_detail

SELECT

Od.id

Oi.id order_id

Oi.user_id

Oi.province_id

Od.sku_id

Od.sku_name

Od.sku_num

Od.order_price

Oi.create_time

Oi.operate_time

FROM

(

SELECT *

FROM ods_order_info

WHERE order_status ='2'-paid

) oi JOIN

(

SELECT *

FROM ods_order_detail

) od

ON oi.id = od.order_id

ADS layer data

After the above steps, we created a dwd_paid_order_detail detail width table and stored it in Kafka. Next, we will use this detail width table and dimension table to JOIN to get our ADS application layer data.

Ads_province_index

First, create the corresponding ADS target table in MySQL: ads_province_index

CREATE TABLE ads.ads_province_index (

Province_id INT (10)

Area_code VARCHAR (100)

Province_name VARCHAR (100)

Region_id INT (10)

Region_name VARCHAR (100)

Order_amount DECIMAL (10Phone2)

Order_count BIGINT (10)

Dt VARCHAR (100)

PRIMARY KEY (province_id, dt)

)

Load data to the ADS tier target of MySQL:

-- Flink SQL Cli operation

-- use DDL to create ADS layer tables in MySQL

-- Index: 1. The number of orders per province per day

-- 2. Daily order amount for each province

CREATE TABLE ads_province_index (

Province_id INT

Area_code STRING

Province_name STRING

Region_id INT

Region_name STRING

Order_amount DECIMAL (10Phone2)

Order_count BIGINT

Dt STRING

PRIMARY KEY (province_id, dt) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/ads'

'table-name' = 'ads_province_index'

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

);

-- dwd_paid_order_detail paid order schedule

CREATE TABLE dwd_paid_order_detail

(

Detail_id BIGINT

Order_id BIGINT

User_id BIGINT

Province_id INT

Sku_id BIGINT

Sku_name STRING

Sku_num INT

Order_price DECIMAL (10Phone2)

Create_time STRING

Pay_time STRING

) WITH (

'connector' = 'kafka'

'topic' = 'dwd_paid_order_detail'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- tmp_province_index

-- order summary temporary table

CREATE TABLE tmp_province_index (

Province_id INT

Number of order_count BIGINT,-- orders

Order_amount DECIMAL (10Pol 2)-- order amount

Pay_date DATE

) WITH (

'connector' = 'kafka'

'topic' = 'tmp_province_index'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- tmp_province_index

-- order summary temporary table data loading

INSERT INTO tmp_province_index

SELECT

Province_id

Number of count (distinct order_id) order_count,-- orders

Sum (order_price * sku_num) order_amount,-- order amount

TO_DATE (pay_time,'yyyy-MM-dd') pay_date

FROM dwd_paid_order_detail

GROUP BY province_id,TO_DATE (pay_time,'yyyy-MM-dd')

-- tmp_province_index_source

-- use this temporary summary table as a data source

CREATE TABLE tmp_province_index_source (

Province_id INT

Number of order_count BIGINT,-- orders

Order_amount DECIMAL (10Pol 2)-- order amount

Pay_date DATE

Proctime as PROCTIME ()-- generate a processing time column by calculating the column

) WITH (

'connector' = 'kafka'

'topic' = 'tmp_province_index'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- DIM layer, area dimension table

-- create a data source for area dimension tables

DROP TABLE IF EXISTS `dim_ province`

CREATE TABLE dim_province (

Province_id INT

Province_name STRING

Area_code STRING

Region_id INT

Region_name STRING

PRIMARY KEY (province_id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' = 'dim_province'

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'scan.fetch-size' =' 100'

);

-- load data to ads_province_index

-- dimensional table JOIN

INSERT INTO ads_province_index

SELECT

Pc.province_id

Dp.area_code

Dp.province_name

Dp.region_id

Dp.region_name

Pc.order_amount

Pc.order_count

Cast (pc.pay_date as VARCHAR)

FROM

Tmp_province_index_source pc

JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp

ON dp.province_id = pc.province_id

After submitting the task: observe the Flink WEB UI:

View the ads_province_ index table data for the ADS layer:

Ads_sku_index

First, create the corresponding ADS target table in MySQL: ads_sku_index

CREATE TABLE ads_sku_index

(

Sku_id BIGINT (10)

Sku_name VARCHAR (100)

Weight DOUBLE

Tm_id BIGINT (10)

Price DOUBLE

Spu_id BIGINT (10)

C3_id BIGINT (10)

C3_name VARCHAR (100)

C2_id BIGINT (10)

C2_name VARCHAR (100)

C1_id BIGINT (10)

C1_name VARCHAR (100)

Order_amount DOUBLE

Order_count BIGINT (10)

Sku_count BIGINT (10)

Dt varchar (100)

PRIMARY KEY (sku_id,dt)

);

Load data to the ADS tier target of MySQL:

-- use DDL to create ADS layer tables in MySQL

-- Index: 1. The number of orders per commodity per day

-- 2. The order amount corresponding to each item per day

-3. The corresponding quantity of each commodity per day

CREATE TABLE ads_sku_index

(

Sku_id BIGINT

Sku_name VARCHAR

Weight DOUBLE

Tm_id BIGINT

Price DOUBLE

Spu_id BIGINT

C3_id BIGINT

C3_name VARCHAR

C2_id BIGINT

C2_name VARCHAR

C1_id BIGINT

C1_name VARCHAR

Order_amount DOUBLE

Order_count BIGINT

Sku_count BIGINT

Dt varchar

PRIMARY KEY (sku_id,dt) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/ads'

'table-name' = 'ads_sku_index'

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

);

-- dwd_paid_order_detail paid order schedule

CREATE TABLE dwd_paid_order_detail

(

Detail_id BIGINT

Order_id BIGINT

User_id BIGINT

Province_id INT

Sku_id BIGINT

Sku_name STRING

Sku_num INT

Order_price DECIMAL (10Phone2)

Create_time STRING

Pay_time STRING

) WITH (

'connector' = 'kafka'

'topic' = 'dwd_paid_order_detail'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- tmp_sku_index

-Commodity index statistics

CREATE TABLE tmp_sku_index (

Sku_id BIGINT

Number of order_count BIGINT,-- orders

Order_amount DECIMAL (10Pol 2)-- order amount

Order_sku_num BIGINT

Pay_date DATE

) WITH (

'connector' = 'kafka'

'topic' = 'tmp_sku_index'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- tmp_sku_index

-- data loading

INSERT INTO tmp_sku_index

SELECT

Sku_id

Number of count (distinct order_id) order_count,-- orders

Sum (order_price * sku_num) order_amount,-- order amount

Sum (sku_num) order_sku_num

TO_DATE (pay_time,'yyyy-MM-dd') pay_date

FROM dwd_paid_order_detail

GROUP BY sku_id,TO_DATE (pay_time,'yyyy-MM-dd')

-- tmp_sku_index_source

-- use this temporary summary table as a data source

CREATE TABLE tmp_sku_index_source (

Sku_id BIGINT

Number of order_count BIGINT,-- orders

Order_amount DECIMAL (10Pol 2)-- order amount

Order_sku_num BIGINT

Pay_date DATE

Proctime as PROCTIME ()-- generate a processing time column by calculating the column

) WITH (

'connector' = 'kafka'

'topic' = 'tmp_sku_index'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'kms-3:9092'

'format' = 'changelog-json'

);

-- DIM layer, commodity maintenance table

-- create a commodity dimension table data source

DROP TABLE IF EXISTS `dim_sku_ info`

CREATE TABLE dim_sku_info (

Id BIGINT

Sku_name STRING

C3_id BIGINT

Weight DECIMAL (10Phone2)

Tm_id BIGINT

Price DECIMAL (10Phone2)

Spu_id BIGINT

C3_name STRING

C2_id BIGINT

C2_name STRING

C1_id BIGINT

C1_name STRING

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc'

'url' = 'jdbc:mysql://kms-1:3306/dim'

'table-name' = 'dim_sku_info'

'driver' = 'com.mysql.jdbc.Driver'

'username' = 'root'

'password' =' 123qwe'

'scan.fetch-size' =' 100'

);

-- load data to ads_sku_index

-- dimensional table JOIN

INSERT INTO ads_sku_index

SELECT

Sku_id

Sku_name

Weight

Tm_id

Price

Spu_id

C3_id

C3_name

C2_id

C2_name

C1_id

C1_name

Sc.order_amount

Sc.order_count

Sc.order_sku_num

Cast (sc.pay_date as VARCHAR)

FROM

Tmp_sku_index_source sc

JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds

ON ds.id = sc.sku_id

After submitting the task: observe the Flink WEB UI:

View the ads_sku_ index table data for the ADS layer:

FineBI result display

Other attention points: bug existing in Flink1.11.0

When using the Flink1.11.0 version in your code, if you insert the data source of a change-log to a upsert sink, the following exception is reported:

[ERROR] Could not execute SQL statement. Reason:

Org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.

Current node is TableSourceScan (table= [[default_catalog, default_database, t_pick_order]], fields= [order _ no, status])

The bug has been fixed and can be used in Flink1.11.1.

At this point, I believe that everyone on the "Flink1.11-based SQL to build real-time data warehouse how to achieve" have a deeper understanding, might as well to practical operation it! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report