使用DLI Flink SQL进行电商实时业务数据分析

业务场景介绍

场景描述

当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。而电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。而如何高效快捷地统计这些指标呢?

假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。而我们需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。

场景方案

1-1.png

场景任务

使用DLI Flink完成电商业务实时数据的分析处理,获取各个渠道的销售汇总数据。

数据说明

- 数据源表:电商业务订单详情宽表

字段名

字段类型

说明

order_id

string

订单ID

order_channel

string

订单生成的渠道(web方式、app方式等)

order_time

string

订单时间

pay_amount

double

订单金额

real_pay

double

实际支付金额

pay_time

string

支付时间

user_id

string

用户ID

user_name

string

用户姓名

area_id

string

订单地区ID


- 结果表:各渠道的销售总额实时统计表。

字段名

字段类型

说明

begin_time

varchar(32)

开始统计指标的时间

channel_code

varchar(32)

渠道编号

channel_name

varchar(32)

渠道名

cur_gmv

double

当天GMV

cur_order_user_count

bigint

当天付款人数

cur_order_count

bigint

当天付款订单数

last_pay_time

varchar(32)

最近结算时间

flink_current_time

varchar(32)

Flink数据处理时间


流程介绍

使用DLI Flink进行电商实时业务数据分析的操作过程主要包括7个步骤:

步骤1:注册账号。使用DLI对数据进行分析之前,需要注册华为云账号并进行实名认证。

步骤2:创建资源。在您的账户下创建作业需要的相关资源,涉及VPCDMSDLIRDS

步骤3:创建DMS topic并获取连接地址。

步骤4:创建RDS数据库表。

步骤5:创建增强型跨源打通网络。

步骤6:创建并提交Flink作业。

步骤7:查询结果,使用DLV进行大屏展示。


步骤1:注册账号

注册华为云账号并进行实名认证。

注册账号具体步骤可参考账号注册可参考账号注册

实名认证具体步骤可参考实名认证

如果您已完成华为云账号注册和实名认证,可跳过该步骤。


步骤2:创建资源

  1. 创建VPC,具体步骤可参考:创建VPC和子网
  2. 创建DMS Kafka实例,具体步骤可参考:DMS Kafka入门指引
  3. 创建RDS MySQL实例,具体步骤可参考:RDS MySQL快速入门
  4. 创建DLI CCE队列,具体步骤可参考:DLI 创建队列
  5. 创建DLV大屏,具体步骤可参考:DLV 创建大屏


创建资源时注意以下几点:

  1. Kafka与MySQL实例创建时需指定VPC,该VPC需提前创建好,且网段不与后续创建的DLI队列网段冲突
  2. DLI Flink Opensource语法目前仅支持容器化队列(目前仍在封闭测试阶段),因此创建队列前需在官网提工单申请开通CCE队列使用权限后,再创建DLI队列。
  3. 请创建DLI队列时请创建【包年包月】或者【按需-专属资源】模式的通用队列。

2-1.png

步骤3:创建DMS topic并获取连接地址

  1. 点击【服务列表】,搜索【DMS】,找到【分布式消息服务DMS】,点击进入DMS页面。在【Kafka专享版】页面找到您所创建的Kafka实例。

    3-1.png

  2. 进入实例详情页面。点击【基本信息】,获取【连接地址】。


    3-2.png

  3. 点击【Topic管理】,创建一个topictrade_order_detail_info

    3-3.png

topic配置如下:

分区数:1

副本数:1

老化时间:72h

同步落盘:否


步骤4:创建RDS数据库表

  1. 点击【服务列表】,搜索【RDS】,找到【云数据库RDS】,点击进入RDS页面。在【实例管理页面】,会看到您已经创建的RDS实例,获取其内网地址。

    4-1.png

  2. 点击所创建RDS实例的登录按钮,会跳转到数据管理服务-DAS。输入相关账户信息,点击【测试连接】。显示连接成功后,点击【登录】。

    4-2.png

    4-3.png

  3. 登录RDS实例后,点击【新建数据库】按钮,创建名称为【dli-demo】的数据库。

    4-4.png

  4. 点击【SQL操作】- SQL查询】,执行如下SQL创建测试用MySQL表,表相关字段含义在【业务场景介绍】-【数据说明】部分有详细介绍。
    DROP TABLE `dli-demo`.`trade_channel_collect`;
    CREATE TABLE `dli-demo`.`trade_channel_collect` (
        `begin_time` VARCHAR(32) NOT NULL,
        `channel_code` VARCHAR(32) NOT NULL,
        `channel_name` VARCHAR(32) NULL,
        `cur_gmv` DOUBLE UNSIGNED NULL,
        `cur_order_user_count` BIGINT UNSIGNED NULL,
        `cur_order_count` BIGINT UNSIGNED NULL,
        `last_pay_time` VARCHAR(32) NULL,
        `flink_current_time` VARCHAR(32) NULL,
        PRIMARY KEY (`begin_time`, `channel_code`)
    )   ENGINE = InnoDB
        DEFAULT CHARACTER SET = utf8mb4
        COLLATE = utf8mb4_general_ci
        COMMENT = '各渠道的销售总额实时统计';


步骤5:创建增强型跨源打通网络

  1. 点击【服务列表】,搜索【DLI】,找到【数据湖探索】,点击进入DLI服务页面。点击【队列管理】,会看到队列列表里您所创建的通用队列。

    5-1.png

  2. 点击【全局配置】-【服务授权】,选中【VPC Administrator】,点击更新委托权限。此举目的是赋予DLI操作用户VPC资源的权限,用于创建VPC【对等连接】。

    5-2.png

  3. 点击【跨源连接】-【增强型跨源】-【创建】。配置如下:【绑定队列】选择您所创建的通用队列,【虚拟私有云】和【子网】选择 Kafka MySQL 实例所在的 VPC 与子网。 点击确定。

    5-3.png

  4. 创建完成后,在跨源列表中,对应的跨源连接状态会显示为【已激活】。点击跨源连接名称,详情页面显示连接状态为【ACTIVE】。

    5-4.png

    5-5.png

  5. 测试队列与RDSDMS实例连通性。点击【队列管理】,选择您所使用的队列,点击【更多】-【测试地址连通性】。输入前序步骤3-2获取的DMS Kafka实例连接地址和步骤4-2获取的RDS MySQL实例内网地址,进行网络连通性测试。测试结果显示可达,则DLI队列与KafkaMySQL实例的网络已经联通。5-7.png
  6. 如果测试结果不可达,需要修改实例所在VPC的安全组规则,放开9092(Kafka连接端口)3306(MySQL连接端口)DLI队列的限制,DLI队列网段信息可以在队列【下拉详情页】获取。

    5-8.png

步骤6:创建并提交Flink作业

  1. 点击DLI控制台左侧栏【作业管理】,选择【Flink作业】。点击“创建作业”,选择作业类型为:Flink OpenSource SQL,名称自定义。

    6-1.png

  2. 点击“确定”后进入编辑作业页面,具体SQL示例如下所示,部分参数值需要根据RDSDMS对应的信息进行修改。
    --********************************************************************--
    -- 数据源:trade_order_detail_info (订单详情宽表)
    --********************************************************************--
    create table trade_order_detail (
      order_id string,      -- 订单ID
      order_channel string,   -- 渠道
      order_time string,     -- 订单创建时间
      pay_amount double,     -- 订单金额
      real_pay double,      -- 实际付费金额
      pay_time string,      -- 付费时间
      user_id string,      -- 用户ID
      user_name string,     -- 用户名
      area_id string       -- 地区ID
    ) with (
      "connector.type" = "kafka",
      "connector.version" = "0.10",
      "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
      "connector.properties.group.id" = "trade_order",   -- Kafka groupID
      "connector.topic" = "trade_order_detail_info",     -- Kafka topic
      "format.type" = "json",
      "connector.startup-mode" = "latest-offset"
    );
     
    --********************************************************************--
    -- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
    --********************************************************************--
    create table trade_channel_collect(
      begin_time string,       --统计数据的开始时间
      channel_code string,      -- 渠道编号
      channel_name string,      -- 渠道名
      cur_gmv double,         -- 当天GMV
      cur_order_user_count bigint, -- 当天付款人数
      cur_order_count bigint,    -- 当天付款订单数
      last_pay_time string,     -- 最近结算时间
      flink_current_time string,
      primary key (begin_time, channel_code) not enforced
    ) with (
      "connector.type" = "jdbc",
      "connector.url" = "jdbc:mysql://xxxx:3306/xxxx",    -- mysql连接地址,jdbc格式
      "connector.table" = "xxxx",            -- mysql表名
      "connector.driver" = "com.mysql.jdbc.Driver",
      "connector.username" = "xxx",                    -- mysql用户名
      "connector.password" = "xxxx",                   -- mysql密码
      "connector.write.flush.max-rows" = "1000",
      "connector.write.flush.interval" = "1s"
    );
     
    --********************************************************************--
    -- 临时中间表
    --********************************************************************--
    create view tmp_order_detail
    as
    select *
        , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
               else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
        , case when t.order_channel = "webShop" then _UTF16"网页商城"
               when t.order_channel = "appShop" then _UTF16"app商城"
               when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
               else _UTF16"其他" end as channel_name --渠道名称
    from (
        select *
            , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
        from trade_order_detail
        where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
        and real_pay is not null
    ) t
    where t.rn = 1;
     
    -- 按渠道统计各个指标
    insert into trade_channel_collect
    select
          begin_time  --统计数据的开始时间
        , channel_code
        , channel_name
        , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
        , count(distinct user_id) as cur_order_user_count --当天付款人数
        , count(1) as cur_order_count --当天付款订单数
        , max(pay_time) as last_pay_time --最近结算时间
        , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
    from tmp_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
    group by begin_time, channel_code, channel_name;

    作业逻辑说明:

    首先,我们先定义一个Kafka源表,用来从Kafka指定topic中读取消费数据;再定义一个结果表,用来通过JDBC向MySQL中写入结果数据。

    创建源表和结果表以后,需要实现相应的处理逻辑,以实现各个指标的统计。

    为了简化最终的处理逻辑,使用创建视图进行数据预处理。首先利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string),请注意)。然后根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。

    最后,我们根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。

  3. 选择所创建的DLI 通用队列提交作业。

    6-2.png

  4. 等待片刻,作业状态会变为【运行中】。点击作业名,可以查看作业详细运行情况。

    6-3.png

  5. 使用Kafka客户端向指定topic发送数据,模拟实时数据流。具体方法可参考:DMS - 连接实例生产消费信息
    发送命令如下:
    sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka连接地址 --topic 指定topic
    
    示例数据如下:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}

    6-6.png

  6. 访问DLI【作业管理页面】-Flink作业】,点击前序步骤提交的Flink作业。在作业详情页面,可以看到处理的数据记录数。

    6-7.png

步骤7:查询结果

  1. 参考前序步骤4-2,登录MySQL实例,执行如下SQL语句,即可查询到经过Flink作业处理后的结果数据。

    7-1.png

  2. 配置DLV大屏,执行SQL查询RDS MySQL,即可以实现大屏实时展示。具体配置方法可参考:DLV开发大屏

    7-2.png






(完)