本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。
在完成本文实践的同时可以同步参考上述文章。
最终结果:
本文电商业务为例,展示准实时数仓的数据处理流程。
组件与配置说明
Flink 1.13.3
flink cdc 2.0.2
hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)
hadoop 3.2.0
zeppelin 0.10.0
mysql 5.7(开启binlog)
kafka 2.5.0
由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。
本实验Flink开启checkpoint,设置为60s。
在完成以下任务之前,请确保您已经
-
部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
-
部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了Flink_HOME以及HADOOP_CLASSPATH
-
同时还有启动hadoop、mysql、kafka
首先从以下地址获取mysql建表语句以及模拟数据:
https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-hudi-realtime/realtime_table.zip
下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库
mysql -u root -p create database realtime_dw_demo_1; use database realtime_dw_demo_1; source realtime_table.sql将mysql表数据同步到kafka
使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:
读取mysql源表数据
%flink.ssql drop table if exists base_category1; drop table if exists base_category2; drop table if exists base_category3; drop table if exists base_province; drop table if exists base_region; drop table if exists base_trademark; drop table if exists date_info; drop table if exists holiday_info; drop table if exists holiday_year; drop table if exists order_detail; drop table if exists order_info; drop table if exists order_status_log; drop table if exists payment_info; drop table if exists sku_info; drop table if exists user_info; ---mysql table CREATE TABLE `base_category1` ( `id` bigint NOT NULL, `name` string NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category1' ); CREATE TABLE `base_category2` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '二级分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category2' ); CREATE TABLE `base_category3` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '三级分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category3' ); CREATE TABLE `base_province` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_province' ); CREATE TABLE `base_region` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_region' ); CREATE TABLE `base_trademark` ( `tm_id` string NULL COMMENT '品牌id', `tm_name` string NULL COMMENT '品牌名称', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_trademark' ); CREATE TABLE `date_info` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'date_info' ); CREATE TABLE `holiday_info` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_info' ); CREATE TABLE `holiday_year` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL, PRIMARY KEY (`end_date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_year' ); CREATE TABLE `order_detail` ( `id` bigint NOT NULL COMMENT '编号', `order_id` bigint NULL COMMENT '订单编号', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku名称(冗余)', `img_url` string NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)', `sku_num` string NULL COMMENT '购买个数', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_detail' ); CREATE TABLE `order_info` ( `id` bigint NOT NULL COMMENT '编号', `consignee` string NULL COMMENT '收货人', `consignee_tel` string NULL COMMENT '收件人电话', `total_amount` decimal(10,2) NULL COMMENT '总金额', `order_status` string NULL COMMENT '订单状态', `user_id` bigint NULL COMMENT '用户id', `payment_way` string NULL COMMENT '付款方式', `delivery_address` string NULL COMMENT '送货地址', `order_comment` string NULL COMMENT '订单备注', `out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` string NULL COMMENT '订单描述(第三方支付用)', `create_time` timestamp(3) NULL COMMENT '创建时间', `operate_time` timestamp(3) NULL COMMENT ' *** 作时间', `expire_time` timestamp(3) NULL COMMENT '失效时间', `tracking_no` string NULL COMMENT '物流单编号', `parent_order_id` bigint NULL COMMENT '父订单编号', `img_url` string NULL COMMENT '图片路径', `province_id` int NULL COMMENT '地区', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_info' ); CREATE TABLE `order_status_log` ( `id` int NOT NULL, `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_status_log' ); CREATE TABLE `payment_info` ( `id` bigint NOT NULL COMMENT '编号', `out_trade_no` string NULL COMMENT '对外业务编号', `order_id` string NULL COMMENT '订单编号', `user_id` string NULL COMMENT '用户编号', `alipay_trade_no` string NULL COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) NULL COMMENT '支付金额', `subject` string NULL COMMENT '交易内容', `payment_type` string NULL COMMENT '支付方式', `payment_time` string NULL COMMENT '支付时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'hostname', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'payment_info' ); CREATE TABLE `sku_info` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'sku_info' ); CREATE TABLE `user_info` ( `id` bigint NOT NULL COMMENT '编号', `login_name` string NULL COMMENT '用户名称', `nick_name` string NULL COMMENT '用户昵称', `passwd` string NULL COMMENT '用户密码', `name` string NULL COMMENT '用户姓名', `phone_num` string NULL COMMENT '手机号', `email` string NULL COMMENT '邮箱', `head_img` string NULL COMMENT '头像', `user_level` string NULL COMMENT '用户级别', `birthday` date NULL COMMENT '用户生日', `gender` string NULL COMMENT '性别 M男,F女', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'hostname', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'user_info' );
kafka sink表建表语句
%flink.ssql drop table if exists base_category1_topic; drop table if exists base_category2_topic; drop table if exists base_category3_topic; drop table if exists base_province_topic; drop table if exists base_region_topic; drop table if exists base_trademark_topic; drop table if exists date_info_topic; drop table if exists holiday_info_topic; drop table if exists holiday_year_topic; drop table if exists order_detail_topic; drop table if exists order_info_topic; drop table if exists order_status_log_topic; drop table if exists payment_info_topic; drop table if exists sku_info_topic; drop table if exists user_info_topic; CREATE TABLE `base_category1_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category2_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '二级分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category3_topic` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '三级分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_province_topic` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_region_topic` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_trademark_topic` ( `tm_id` string NULL COMMENT '品牌id', `tm_name` string NULL COMMENT '品牌名称', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_trademark' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `date_info_topic` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.date_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_info_topic` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_year_topic` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_year' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_detail_topic` ( `id` bigint NOT NULL COMMENT '编号', `order_id` bigint NULL COMMENT '订单编号', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku名称(冗余)', `img_url` string NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)', `sku_num` string NULL COMMENT '购买个数', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_detail' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `consignee` string NULL COMMENT '收货人', `consignee_tel` string NULL COMMENT '收件人电话', `total_amount` decimal(10,2) NULL COMMENT '总金额', `order_status` string NULL COMMENT '订单状态', `user_id` bigint NULL COMMENT '用户id', `payment_way` string NULL COMMENT '付款方式', `delivery_address` string NULL COMMENT '送货地址', `order_comment` string NULL COMMENT '订单备注', `out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` string NULL COMMENT '订单描述(第三方支付用)', `create_time` timestamp(3) NULL COMMENT '创建时间', `operate_time` timestamp(3) NULL COMMENT ' *** 作时间', `expire_time` timestamp(3) NULL COMMENT '失效时间', `tracking_no` string NULL COMMENT '物流单编号', `parent_order_id` bigint NULL COMMENT '父订单编号', `img_url` string NULL COMMENT '图片路径', `province_id` int NULL COMMENT '地区', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_status_log_topic` ( `id` int NOT NULL , `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_status_log' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `payment_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `out_trade_no` string NULL COMMENT '对外业务编号', `order_id` string NULL COMMENT '订单编号', `user_id` string NULL COMMENT '用户编号', `alipay_trade_no` string NULL COMMENT '支付宝交易流水编号', `total_amount` decimal(16,2) NULL COMMENT '支付金额', `subject` string NULL COMMENT '交易内容', `payment_type` string NULL COMMENT '支付方式', `payment_time` string NULL COMMENT '支付时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.payment_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `sku_info_topic` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ); CREATE TABLE `user_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `login_name` string NULL COMMENT '用户名称', `nick_name` string NULL COMMENT '用户昵称', `passwd` string NULL COMMENT '用户密码', `name` string NULL COMMENT '用户姓名', `phone_num` string NULL COMMENT '手机号', `email` string NULL COMMENT '邮箱', `head_img` string NULL COMMENT '头像', `user_level` string NULL COMMENT '用户级别', `birthday` date NULL COMMENT '用户生日', `gender` varchar(1) NULL COMMENT '性别 M男,F女', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.user_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' );
insert语句,将mysql binlog数据导入kafka对应的topic
%flink.ssql(runAsOne=true) insert into base_category1_topic select * from base_category1; insert into base_category2_topic select * from base_category2; insert into base_category3_topic select * from base_category3; insert into base_province_topic select * from base_province; insert into base_region_topic select * from base_region; insert into base_trademark_topic select * from base_trademark; insert into date_info_topic select * from date_info; insert into holiday_info_topic select * from holiday_info; insert into holiday_year_topic select * from holiday_year; insert into order_detail_topic select * from order_detail; insert into order_info_topic select * from order_info; insert into order_status_log_topic select * from order_status_log; insert into payment_info_topic select * from payment_info; insert into sku_info_topic select * from sku_info; insert into user_info_topic select * from user_info;将维表数据导入hudi
将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中
%flink.ssql drop table if exists base_province_topic_source; drop table if exists base_province_hudi; CREATE TABLE `base_province_topic_source` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_province_hudi` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT '省名称', `region_id` int NULL COMMENT '大区id', `area_code` string NULL COMMENT '行政区位码', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_province_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql drop table if exists base_region_topic_source; drop table if exists base_region_hudi; CREATE TABLE `base_region_topic_source` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_region_hudi` ( `id` int NOT NULL COMMENT '大区id', `region_name` string NULL COMMENT '大区名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_region_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_region_hudi select * from base_region_topic_source;
使用上述两张维表创建dim_province表
%flink.ssql DROp TABLE IF EXISTS dim_province_hudi; create table dim_province_hudi ( province_id INT, province_name STRING, area_code STRING, region_id INT, region_name STRING , PRIMARY KEY (province_id) NOT ENFORCED ) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/dim_province_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'province_id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into dim_province_hudi SELECT bp.id AS province_id, bp.name AS province_name, bp.area_code AS area_code, br.id AS region_id, br.region_name AS region_name FROM base_region_hudi br JOIN base_province_hudi bp ON br.id= bp.region_id ;
将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表
%flink.ssql drop table if exists base_category1_topic_source; drop table if exists base_category1_hudi; CREATE TABLE `base_category1_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category1_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category1_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql drop table if exists base_category2_topic_source; drop table if exists base_category2_hudi; CREATE TABLE `base_category2_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category2_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category1_id` bigint NULL COMMENT '一级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category2_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql drop table if exists base_category3_topic_source; drop table if exists base_category3_hudi; CREATE TABLE `base_category3_topic_source` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category3_hudi` ( `id` bigint NOT NULL COMMENT '编号', `name` string NOT NULL COMMENT '分类名称', `category2_id` bigint NULL COMMENT '二级分类编号', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/base_category3_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category3_hudi select * from base_category3_topic_source;将商品表导入hudi
%flink.ssql drop table if exists sku_info_topic_source; drop table if exists sku_info_topic_hudi; CREATE TABLE `sku_info_topic_source` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `sku_info_topic_hudi` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT '价格', `sku_name` string NULL COMMENT 'sku名称', `sku_desc` string NULL COMMENT '商品规格描述', `weight` decimal(10,2) NULL COMMENT '重量', `tm_id` bigint NULL COMMENT '品牌(冗余)', `category3_id` bigint NULL COMMENT '三级分类id(冗余)', `sku_default_img` string NULL COMMENT '默认显示图片(冗余)', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/sku_info_topic_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into sku_info_topic_hudi select * from sku_info_topic_source;
基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建
dim_sku_info视图
%flink.ssql drop view if exists dim_sku_info; CREATE VIEW dim_sku_info AS SELECT si.id AS id, si.sku_name AS sku_name, si.category3_id AS c3_id, si.weight AS weight, si.tm_id AS tm_id, si.price AS price, si.spu_id AS spu_id, c3.name AS c3_name, c2.id AS c2_id, c2.name AS c2_name, c3.id AS c1_id, c3.name AS c1_name FROM sku_info_topic_hudi si JOIN base_category3_hudi c3 ON si.category3_id = c3.id JOIN base_category2_hudi c2 ON c3.category2_id =c2.id JOIN base_category1_hudi c1 ON c2.category1_id = c1.id ;DWD层数据处理
经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。
%flink.ssql drop table if exists ods_order_detail_topic; drop table if exists ods_order_info_topic; drop table if exists dwd_paid_order_detail_hudi; CREATE TABLE `ods_order_detail_topic` ( `id` bigint NOT NULL COMMENT '编号', `order_id` bigint NULL COMMENT '订单编号', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku名称(冗余)', `img_url` string NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)', `sku_num` int NULL COMMENT '购买个数', `create_time` timestamp(3) NULL COMMENT '创建时间', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_detail' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `ods_order_info_topic` ( `id` bigint NOT NULL COMMENT '编号', `consignee` string NULL COMMENT '收货人', `consignee_tel` string NULL COMMENT '收件人电话', `total_amount` decimal(10,2) NULL COMMENT '总金额', `order_status` string NULL COMMENT '订单状态', `user_id` bigint NULL COMMENT '用户id', `payment_way` string NULL COMMENT '付款方式', `delivery_address` string NULL COMMENT '送货地址', `order_comment` string NULL COMMENT '订单备注', `out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` string NULL COMMENT '订单描述(第三方支付用)', `create_time` timestamp(3) NULL COMMENT '创建时间', `operate_time` timestamp(3) NULL COMMENT ' *** 作时间', `expire_time` timestamp(3) NULL COMMENT '失效时间', `tracking_no` string NULL COMMENT '物流单编号', `parent_order_id` bigint NULL COMMENT '父订单编号', `img_url` string NULL COMMENT '图片路径', `province_id` int NULL COMMENT '地区', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_info' ,'properties.bootstrap.servers' = 'kafka-cluster:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE dwd_paid_order_detail_hudi ( detail_id BIGINT, order_id BIGINT, user_id BIGINT, province_id INT, sku_id BIGINT, sku_name STRING, sku_num INT, order_price DECIMAL(10,0), create_time TIMESTAMP(3), pay_time TIMESTAMP(3), primary key (detail_id) not enforced ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/dwd_paid_order_detail_hudi', 'scan.startup.mode' = 'earliest-offset', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into dwd_paid_order_detail_hudi SELECT od.id, oi.id order_id, oi.user_id, oi.province_id, od.sku_id, od.sku_name, od.sku_num, od.order_price, oi.create_time, oi.operate_time FROM ( SELECt * FROM ods_order_info_topic WHERe order_status = '2' -- 已支付 ) oi JOIN ( SELECt * FROM ods_order_detail_topic ) od ON oi.id = od.order_id;ADS层数据
经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。
ads_province_index_hudi
%flink.ssql drop table if exists ads_province_index_hudi; drop table if exists tmp_province_index_hudi; -- --------------------------------- -- 使用 DDL创建MySQL中的ADS层表 -- 指标:1.每天每个省份的订单数 -- 2.每天每个省份的订单金额 -- --------------------------------- CREATE TABLE ads_province_index_hudi( province_id INT, area_code STRING, province_name STRING, region_id INT, region_name STRING, order_amount DECIMAL(10,2), order_count BIGINT, dt STRING, PRIMARY KEY (province_id, dt) NOT ENFORCED ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/ads_province_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' ); -- --------------------------------- -- tmp_province_index -- 订单汇总临时表 -- --------------------------------- CREATE TABLE tmp_province_index_hudi( province_id INT, order_count BIGINT,-- 订单数 order_amount DECIMAL(10,2), -- 订单金额 pay_date DATE, primary key(province_id) not enforced )WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/tmp_province_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql -- --------------------------------- -- tmp_province_index -- 订单汇总临时表数据装载 -- --------------------------------- INSERT INTO tmp_province_index_hudi SELECT province_id, count(distinct order_id) order_count,-- 订单数 sum(order_price * sku_num) order_amount, -- 订单金额 TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date FROM dwd_paid_order_detail_hudi GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_province_index_hudi SELECt pc.province_id, dp.area_code, dp.province_name, dp.region_id, dp.region_name, pc.order_amount, pc.order_count, cast(pc.pay_date as VARCHAR) FROM tmp_province_index_hudi pc JOIN dim_province_hudi as dp ON dp.province_id = pc.province_id;
查看ADS层的ads_province_index_hudi表数据:
ads_sku_index_hudi
%flink.ssql -- --------------------------------- -- 使用 DDL创建hudi中的ADS层表 -- 指标:1.每天每个商品对应的订单个数 -- 2.每天每个商品对应的订单金额 -- 3.每天每个商品对应的数量 -- --------------------------------- drop table if exists ads_sku_index_hudi; CREATE TABLE ads_sku_index_hudi ( sku_id BIGINT, sku_name VARCHAR, weight DOUBLE, tm_id BIGINT, price DOUBLE, spu_id BIGINT, c3_id BIGINT, c3_name VARCHAR , c2_id BIGINT, c2_name VARCHAR, c1_id BIGINT, c1_name VARCHAR, order_amount DOUBLE, order_count BIGINT, sku_count BIGINT, dt varchar, PRIMARY KEY (sku_id,dt) NOT ENFORCED ) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/ads_sku_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' ); -- --------------------------------- -- tmp_sku_index -- 商品指标统计 -- --------------------------------- drop table if exists tmp_sku_index_hudi; CREATE TABLE tmp_sku_index_hudi( sku_id BIGINT, order_count BIGINT,-- 订单数 order_amount DECIMAL(10,2), -- 订单金额 order_sku_num BIGINT, pay_date DATE, PRIMARY KEY (sku_id) NOT ENFORCED )WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:8020/realtime-demo-2/tmp_sku_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql -- --------------------------------- -- tmp_sku_index -- 数据装载 -- --------------------------------- INSERT INTO tmp_sku_index_hudi SELECT sku_id, count(distinct order_id) order_count, -- 订单数 sum(order_price * sku_num) order_amount, -- 订单金额 sum(sku_num) order_sku_num, TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date FROM dwd_paid_order_detail_hudi GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_sku_index_hudi SELECt sku_id , sku_name , weight , tm_id , price , spu_id , c3_id , c3_name, c2_id , c2_name , c1_id , c1_name , sc.order_amount, sc.order_count , sc.order_sku_num , cast(sc.pay_date as VARCHAR) FROM tmp_sku_index_hudi sc JOIN dim_sku_info ds ON ds.id = sc.sku_id
%flink.ssql select * from ads_sku_index_hudi;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)