In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to build a real-time data warehouse based on Flink1.11 SQL". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to build a real-time data warehouse based on Flink1.11 SQL.
Case introduction
This paper will take e-commerce business as an example to show the data processing flow of real-time data warehouse. In addition, the purpose of this paper is to explain the construction process of real-time data warehouse, so it will not involve too complex data calculation. In order to ensure the maneuverability and integrity of the case, this paper will give detailed operation steps. For demonstration purposes, all of the operations in this article are done in Flink SQL Cli.
Architecture design
The specific architecture design is shown in the figure: first, the binlog log of MySQL is parsed through canal, and the data is stored in Kafka. Then use Flink SQL to clean and associate the original data, and write the processed detail width table to kafka. The dimension table data is stored in MySQL, and the detail width table and dimension table are JOIN through Flink SQL, and the aggregated data is written to MySQL, and finally displayed visually through FineBI.
Business data preparation order form (order_info) CREATE TABLE `order_ info` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'
`consignee` varchar (100) DEFAULT NULL COMMENT 'consignee'
`consignee_ tel`varchar (20) DEFAULT NULL COMMENT 'recipient phone'
`total_ amount `decimal (10jin2) DEFAULT NULL COMMENT 'Total amount'
`order_ status` varchar (20) DEFAULT NULL COMMENT 'order status'
`user_ id` bigint (20) DEFAULT NULL COMMENT 'user id'
`payment_ Way`varchar (20) DEFAULT NULL COMMENT 'payment method'
`delivery_ address`varchar (1000) DEFAULT NULL COMMENT 'shipping address'
`order_ comment`varchar (200) DEFAULT NULL COMMENT 'order remarks'
`order No`varchar (50) DEFAULT NULL COMMENT 'order transaction number (for third-party payment)'
`trade_ body`varchar (200) DEFAULT NULL COMMENT 'order description (for third party payment)'
`create_ time`datetime DEFAULT NULL COMMENT 'creation time'
`operate_ time`datetime DEFAULT NULL COMMENT 'operation time'
`expire_ time`datetime DEFAULT NULL COMMENT 'expiration time'
`Logistics order No. 'tracking_ No`varchar (100) DEFAULT NULL COMMENT
`parent_order_ id`bigint (20) DEFAULT NULL COMMENT 'parent order number'
`img_ url`varchar (200) DEFAULT NULL COMMENT 'picture path'
`province_ id`int (20) DEFAULT NULL COMMENT 'area'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' order form'
Order details form (order_detail) CREATE TABLE `order_ detail` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'
`order_ id`bigint (20) DEFAULT NULL COMMENT 'order number'
`sku_ id` bigint (20) DEFAULT NULL COMMENT 'sku_id'
`sku_ name`varchar (200) DEFAULT NULL COMMENT 'sku name (redundant)'
`img_ url`varchar (200) DEFAULT NULL COMMENT 'picture name (redundant)'
`order_ price`decimal (10jin2) DEFAULT NULL COMMENT 'purchase price (sku price when placing an order)'
`sku_ Num`varchar (200) DEFAULT NULL COMMENT 'number of purchases'
`create_ time`datetime DEFAULT NULL COMMENT 'creation time'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' order details table'
Goods table (sku_info) CREATE TABLE `sku_ info` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'skuid (itemID)'
`spu_ id` bigint (20) DEFAULT NULL COMMENT 'spuid'
`price`decimal (10jin0) DEFAULT NULL COMMENT 'Price'
`sku_ name`varchar (200) DEFAULT NULL COMMENT 'sku name'
`sku_ specification 'varchar (2000) DEFAULT NULL COMMENT' commodity specification description'
`weight`decimal (10pm 2) DEFAULT NULL COMMENT 'weight'
`tm_ id`bigint (20) DEFAULT NULL COMMENT 'Brand (redundancy)'
`category3_ id`bigint (20) DEFAULT NULL COMMENT 'three-level classification id (redundancy)'
`The sku_default_ img`varchar (200) DEFAULT NULL COMMENT 'shows pictures by default (redundant)'
`create_ time`datetime DEFAULT NULL COMMENT 'creation time'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' Commodity list'
Commodity level 1 catalogue (base_category1) CREATE TABLE `base_ categories 1` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'
`name`varchar (10) NOT NULL COMMENT 'category name'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' level 1 classification table'
Commodity level 2 catalogue (base_category2) CREATE TABLE `base_ categories 2` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'
`name`varchar (200) NOT NULL COMMENT 'secondary classification name'
`category1_ id`bigint (20) DEFAULT NULL COMMENT 'first-level classification number'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' level 2 classification table'
Commodity level 3 catalogue (base_category3) CREATE TABLE `base_ categories 3` (
`id`bigint (20) NOT NULL AUTO_INCREMENT COMMENT 'number'
`name`varchar (200) NOT NULL COMMENT 'third-level classification name'
`category2_ id`bigint (20) DEFAULT NULL COMMENT 'second-level classification number'
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=' three-level classification table'
Provincial share table (base_province) CREATE TABLE `base_ province` (
`id`int (20) DEFAULT NULL COMMENT 'id'
`name`varchar (20) DEFAULT NULL COMMENT 'province name'
`region_ id` int (20) DEFAULT NULL COMMENT 'Theater id'
`area_ code`varchar (20) DEFAULT NULL COMMENT 'administrative region bit code'
) ENGINE=InnoDB DEFAULT CHARSET=utf8
Area table (base_region) CREATE TABLE `base_ region` (
`id`int (20) NOT NULL COMMENT 'Theater id'
`region_ name`varchar (20) DEFAULT NULL COMMENT 'Theater name'
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; data processing flow ODS layer data synchronization
For data synchronization in the ODS layer, see my other article implementing real-time incremental data synchronization based on Canal and Flink (1). Mainly use canal to parse the binlog log of MySQL, and then write it to the corresponding topic of Kafka. Due to space constraints, specific details will not be explained. The result after synchronization is shown in the following figure:
DIM layer dimension table data preparation
In this case, the dimension table is stored in MySQL, and HBase will be used to store dimension table data in actual production. We mainly use two dimension tables: area dimension table and commodity dimension table. The process is as follows:
Area dimension table
First of all, the data corresponding to the topics of mydw.base_province and mydw.base_region are extracted into MySQL, mainly using the canal-json format corresponding to the Kafka data source of Flink SQL. Note: before loading, you need to create the corresponding table in MySQL. The name of the MySQL database used in this article is dim, which is used to store dimensional table data. As follows:
-provinces
-- kafka Source
DROP TABLE IF EXISTS `ods_base_ province`
CREATE TABLE `ods_base_ province` (
`id` INT
`name` STRING
`region_ id` INT
`area_ code 'string
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.base_province'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-provinces
-- MySQL Sink
DROP TABLE IF EXISTS `base_ province`
CREATE TABLE `base_ province` (
`id` INT
`name` STRING
`region_ id` INT
`area_ code 'string
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' base_province',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-provinces
-- MySQL Sink Load Data
INSERT INTO base_province
SELECT *
FROM ods_base_province
-- region
-- kafka Source
DROP TABLE IF EXISTS `ods_base_ region`
CREATE TABLE `ods_base_ region` (
`id` INT
`region_ name` STRING
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.base_region'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- region
-- MySQL Sink
DROP TABLE IF EXISTS `base_ region`
CREATE TABLE `base_ region` (
`id` INT
`region_ name` STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' base_region',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-- region
-- MySQL Sink Load Data
INSERT INTO base_region
SELECT *
FROM ods_base_region
After the above steps, the original data needed to create the dimension table has been stored in MySQL. Next, you need to create the dimension table in MySQL. We use the above two tables to create a view: dim_province as the dimension table:
-- DIM layer, area dimension table
-- create views in MySQL
DROP VIEW IF EXISTS dim_province
CREATE VIEW dim_province AS
SELECT
Bp.id AS province_id
Bp.name AS province_name
Br.id AS region_id
Br.region_name AS region_name
Bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id
In this way, the dimension table we need: dim_province has been created. We only need to use Flink SQL to create the data source of JDBC when we create the dimension table join, and we can use it. Similarly, we use the same method to create a product dimension table, as follows:
-- list of first-level categories
-- kafka Source
DROP TABLE IF EXISTS `ods_base_ roomy1`
CREATE TABLE `ods_base_ roomy1` (
`id` BIGINT
`name` STRING
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.base_category1'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- list of first-level categories
-- MySQL Sink
DROP TABLE IF EXISTS `base_ roomy1`
CREATE TABLE `base_ roomy1` (
`id` BIGINT
`name` STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' base_category1',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-- list of first-level categories
-- MySQL Sink Load Data
INSERT INTO base_category1
SELECT *
FROM ods_base_category1
-- second-level catalog table
-- kafka Source
DROP TABLE IF EXISTS `ods_base_ roomy2`
CREATE TABLE `ods_base_ roomy2` (
`id` BIGINT
`name` STRING
`category1_ id` BIGINT
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.base_category2'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- second-level catalog table
-- MySQL Sink
DROP TABLE IF EXISTS `base_ roomy2`
CREATE TABLE `base_ roomy2` (
`id` BIGINT
`name` STRING
`category1_ id` BIGINT
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' base_category2',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-- second-level catalog table
-- MySQL Sink Load Data
INSERT INTO base_category2
SELECT *
FROM ods_base_category2
-- three-level catalog table
-- kafka Source
DROP TABLE IF EXISTS `ods_base_ roomy3`
CREATE TABLE `ods_base_ roomy3` (
`id` BIGINT
`name` STRING
`category2_ id` BIGINT
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.base_category3'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- three-level catalog table
-- MySQL Sink
DROP TABLE IF EXISTS `base_ roomy3`
CREATE TABLE `base_ roomy3` (
`id` BIGINT
`name` STRING
`category2_ id` BIGINT
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' base_category3',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-- three-level catalog table
-- MySQL Sink Load Data
INSERT INTO base_category3
SELECT *
FROM ods_base_category3
-- goods list
-- Kafka Source
DROP TABLE IF EXISTS `ods_sku_ info`
CREATE TABLE `ods_sku_ info` (
`id` BIGINT
`spu_ id` BIGINT
`price` DECIMAL (10prime0)
`sku_ name` STRING
`sku_ us` STRING
`weight` DECIMAL (10pm 2)
`tm_ id` BIGINT
`category3_ id` BIGINT
`sku_default_ img` STRING
`create_ time`TIMESTAMP (0)
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.sku_info'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- goods list
-- MySQL Sink
DROP TABLE IF EXISTS `sku_ info`
CREATE TABLE `sku_ info` (
`id` BIGINT
`spu_ id` BIGINT
`price` DECIMAL (10prime0)
`sku_ name` STRING
`sku_ us` STRING
`weight` DECIMAL (10pm 2)
`tm_ id` BIGINT
`category3_ id` BIGINT
`sku_default_ img` STRING
`create_ time`TIMESTAMP (0)
PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' =' sku_info',-- the table in MySQL for data to be inserted
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'sink.buffer-flush.interval' =' 1s'
);
-- goods
-- MySQL Sink Load Data
INSERT INTO sku_info
SELECT *
FROM ods_sku_info
After the above steps, we can synchronize the basic data table that creates the commodity dimension table to MySQL, and we also need to create the corresponding data table in advance. Next, we use the basic table above to create a view, dim_sku_info, in the dim library of mySQL, which will be used as a dimension table for later use.
-- DIM layer, commodity maintenance table
-- create views in MySQL
CREATE VIEW dim_sku_info AS
SELECT
Si.id AS id
Si.sku_name AS sku_name
Si.category3_id AS c3_id
Si.weight AS weight
Si.tm_id AS tm_id
Si.price AS price
Si.spu_id AS spu_id
C3.name AS c3_name
C2.id AS c2_id
C2.name AS c2_name
C3.id AS c1_id
C3.name AS c1_name
FROM
(
Sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id = c2.id
JOIN base_category1 C1 ON c2.category1_id = c1.id
);
Now that the dimension table data we need is ready, let's start working on the data in the DWD layer.
DWD layer data processing
After the above steps, we have prepared the dimension table used. Next, we will process the original data of ODS and process it into a detail width table of DWD layer. The specific process is as follows:
-- order details
-- Kafka Source
DROP TABLE IF EXISTS `ods_order_ detail`
CREATE TABLE `ods_order_ detail` (
`id` BIGINT
`order_ id` BIGINT
`sku_ id` BIGINT
`sku_ name` STRING
`img_ url` STRING
`order_ price` DECIMAL (10pm 2)
`sku_ Num` INT
`create_ time`TIMESTAMP (0)
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.order_detail'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- order information
-- Kafka Source
DROP TABLE IF EXISTS `ods_order_ info`
CREATE TABLE `ods_order_ info` (
`id` BIGINT
`consignee` STRING
`consignee_ tel` STRING
`total_ amount `DECIMAL (10pm 2)
`order_ status` STRING
`user_ id` BIGINT
`payment_ way` STRING
`delivery_ address` STRING
`order_ comment` STRING
`out_trade_ no` STRING
`trade_ body` STRING
`create_ time`TIMESTAMP (0)
`operate_ time`TIMESTAMP (0)
`expire_ time`TIMESTAMP (0)
`tracking_ no` STRING
`parent_order_ id` BIGINT
`img_ url` STRING
`province_ id` INT
) WITH (
'connector' = 'kafka'
'topic' = 'mydw.order_info'
'properties.bootstrap.servers' = 'kms-3:9092'
'properties.group.id' = 'testGroup'
'format' = 'canal-json'
'scan.startup.mode' = 'earliest-offset'
)
-- DWD layer, payment order schedule dwd_paid_order_detail
DROP TABLE IF EXISTS dwd_paid_order_detail
CREATE TABLE dwd_paid_order_detail
(
Detail_id BIGINT
Order_id BIGINT
User_id BIGINT
Province_id INT
Sku_id BIGINT
Sku_name STRING
Sku_num INT
Order_price DECIMAL (10dint 0)
Create_time STRING
Pay_time STRING
) WITH (
'connector' = 'kafka'
'topic' = 'dwd_paid_order_detail'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- DWD layer, paid order schedule
-- load data to dwd_paid_order_detail
INSERT INTO dwd_paid_order_detail
SELECT
Od.id
Oi.id order_id
Oi.user_id
Oi.province_id
Od.sku_id
Od.sku_name
Od.sku_num
Od.order_price
Oi.create_time
Oi.operate_time
FROM
(
SELECT *
FROM ods_order_info
WHERE order_status ='2'-paid
) oi JOIN
(
SELECT *
FROM ods_order_detail
) od
ON oi.id = od.order_id
ADS layer data
After the above steps, we created a dwd_paid_order_detail detail width table and stored it in Kafka. Next, we will use this detail width table and dimension table to JOIN to get our ADS application layer data.
Ads_province_index
First, create the corresponding ADS target table in MySQL: ads_province_index
CREATE TABLE ads.ads_province_index (
Province_id INT (10)
Area_code VARCHAR (100)
Province_name VARCHAR (100)
Region_id INT (10)
Region_name VARCHAR (100)
Order_amount DECIMAL (10Phone2)
Order_count BIGINT (10)
Dt VARCHAR (100)
PRIMARY KEY (province_id, dt)
)
Load data to the ADS tier target of MySQL:
-- Flink SQL Cli operation
-- use DDL to create ADS layer tables in MySQL
-- Index: 1. The number of orders per province per day
-- 2. Daily order amount for each province
CREATE TABLE ads_province_index (
Province_id INT
Area_code STRING
Province_name STRING
Region_id INT
Region_name STRING
Order_amount DECIMAL (10Phone2)
Order_count BIGINT
Dt STRING
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/ads'
'table-name' = 'ads_province_index'
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
);
-- dwd_paid_order_detail paid order schedule
CREATE TABLE dwd_paid_order_detail
(
Detail_id BIGINT
Order_id BIGINT
User_id BIGINT
Province_id INT
Sku_id BIGINT
Sku_name STRING
Sku_num INT
Order_price DECIMAL (10Phone2)
Create_time STRING
Pay_time STRING
) WITH (
'connector' = 'kafka'
'topic' = 'dwd_paid_order_detail'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- tmp_province_index
-- order summary temporary table
CREATE TABLE tmp_province_index (
Province_id INT
Number of order_count BIGINT,-- orders
Order_amount DECIMAL (10Pol 2)-- order amount
Pay_date DATE
) WITH (
'connector' = 'kafka'
'topic' = 'tmp_province_index'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- tmp_province_index
-- order summary temporary table data loading
INSERT INTO tmp_province_index
SELECT
Province_id
Number of count (distinct order_id) order_count,-- orders
Sum (order_price * sku_num) order_amount,-- order amount
TO_DATE (pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE (pay_time,'yyyy-MM-dd')
-- tmp_province_index_source
-- use this temporary summary table as a data source
CREATE TABLE tmp_province_index_source (
Province_id INT
Number of order_count BIGINT,-- orders
Order_amount DECIMAL (10Pol 2)-- order amount
Pay_date DATE
Proctime as PROCTIME ()-- generate a processing time column by calculating the column
) WITH (
'connector' = 'kafka'
'topic' = 'tmp_province_index'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- DIM layer, area dimension table
-- create a data source for area dimension tables
DROP TABLE IF EXISTS `dim_ province`
CREATE TABLE dim_province (
Province_id INT
Province_name STRING
Area_code STRING
Region_id INT
Region_name STRING
PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' = 'dim_province'
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'scan.fetch-size' =' 100'
);
-- load data to ads_province_index
-- dimensional table JOIN
INSERT INTO ads_province_index
SELECT
Pc.province_id
Dp.area_code
Dp.province_name
Dp.region_id
Dp.region_name
Pc.order_amount
Pc.order_count
Cast (pc.pay_date as VARCHAR)
FROM
Tmp_province_index_source pc
JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
ON dp.province_id = pc.province_id
After submitting the task: observe the Flink WEB UI:
View the ads_province_ index table data for the ADS layer:
Ads_sku_index
First, create the corresponding ADS target table in MySQL: ads_sku_index
CREATE TABLE ads_sku_index
(
Sku_id BIGINT (10)
Sku_name VARCHAR (100)
Weight DOUBLE
Tm_id BIGINT (10)
Price DOUBLE
Spu_id BIGINT (10)
C3_id BIGINT (10)
C3_name VARCHAR (100)
C2_id BIGINT (10)
C2_name VARCHAR (100)
C1_id BIGINT (10)
C1_name VARCHAR (100)
Order_amount DOUBLE
Order_count BIGINT (10)
Sku_count BIGINT (10)
Dt varchar (100)
PRIMARY KEY (sku_id,dt)
);
Load data to the ADS tier target of MySQL:
-- use DDL to create ADS layer tables in MySQL
-- Index: 1. The number of orders per commodity per day
-- 2. The order amount corresponding to each item per day
-3. The corresponding quantity of each commodity per day
CREATE TABLE ads_sku_index
(
Sku_id BIGINT
Sku_name VARCHAR
Weight DOUBLE
Tm_id BIGINT
Price DOUBLE
Spu_id BIGINT
C3_id BIGINT
C3_name VARCHAR
C2_id BIGINT
C2_name VARCHAR
C1_id BIGINT
C1_name VARCHAR
Order_amount DOUBLE
Order_count BIGINT
Sku_count BIGINT
Dt varchar
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/ads'
'table-name' = 'ads_sku_index'
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
);
-- dwd_paid_order_detail paid order schedule
CREATE TABLE dwd_paid_order_detail
(
Detail_id BIGINT
Order_id BIGINT
User_id BIGINT
Province_id INT
Sku_id BIGINT
Sku_name STRING
Sku_num INT
Order_price DECIMAL (10Phone2)
Create_time STRING
Pay_time STRING
) WITH (
'connector' = 'kafka'
'topic' = 'dwd_paid_order_detail'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- tmp_sku_index
-Commodity index statistics
CREATE TABLE tmp_sku_index (
Sku_id BIGINT
Number of order_count BIGINT,-- orders
Order_amount DECIMAL (10Pol 2)-- order amount
Order_sku_num BIGINT
Pay_date DATE
) WITH (
'connector' = 'kafka'
'topic' = 'tmp_sku_index'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- tmp_sku_index
-- data loading
INSERT INTO tmp_sku_index
SELECT
Sku_id
Number of count (distinct order_id) order_count,-- orders
Sum (order_price * sku_num) order_amount,-- order amount
Sum (sku_num) order_sku_num
TO_DATE (pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE (pay_time,'yyyy-MM-dd')
-- tmp_sku_index_source
-- use this temporary summary table as a data source
CREATE TABLE tmp_sku_index_source (
Sku_id BIGINT
Number of order_count BIGINT,-- orders
Order_amount DECIMAL (10Pol 2)-- order amount
Order_sku_num BIGINT
Pay_date DATE
Proctime as PROCTIME ()-- generate a processing time column by calculating the column
) WITH (
'connector' = 'kafka'
'topic' = 'tmp_sku_index'
'scan.startup.mode' = 'earliest-offset'
'properties.bootstrap.servers' = 'kms-3:9092'
'format' = 'changelog-json'
);
-- DIM layer, commodity maintenance table
-- create a commodity dimension table data source
DROP TABLE IF EXISTS `dim_sku_ info`
CREATE TABLE dim_sku_info (
Id BIGINT
Sku_name STRING
C3_id BIGINT
Weight DECIMAL (10Phone2)
Tm_id BIGINT
Price DECIMAL (10Phone2)
Spu_id BIGINT
C3_name STRING
C2_id BIGINT
C2_name STRING
C1_id BIGINT
C1_name STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://kms-1:3306/dim'
'table-name' = 'dim_sku_info'
'driver' = 'com.mysql.jdbc.Driver'
'username' = 'root'
'password' =' 123qwe'
'scan.fetch-size' =' 100'
);
-- load data to ads_sku_index
-- dimensional table JOIN
INSERT INTO ads_sku_index
SELECT
Sku_id
Sku_name
Weight
Tm_id
Price
Spu_id
C3_id
C3_name
C2_id
C2_name
C1_id
C1_name
Sc.order_amount
Sc.order_count
Sc.order_sku_num
Cast (sc.pay_date as VARCHAR)
FROM
Tmp_sku_index_source sc
JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
ON ds.id = sc.sku_id
After submitting the task: observe the Flink WEB UI:
View the ads_sku_ index table data for the ADS layer:
FineBI result display
Other attention points: bug existing in Flink1.11.0
When using the Flink1.11.0 version in your code, if you insert the data source of a change-log to a upsert sink, the following exception is reported:
[ERROR] Could not execute SQL statement. Reason:
Org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan (table= [[default_catalog, default_database, t_pick_order]], fields= [order _ no, status])
The bug has been fixed and can be used in Flink1.11.1.
At this point, I believe that everyone on the "Flink1.11-based SQL to build real-time data warehouse how to achieve" have a deeper understanding, might as well to practical operation it! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.