资讯详情

指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

作者:吴云涛,腾讯 CSIG 高级工程师

导语 | 最近梳理了如何使用。 Flink 实现实时 UV、PV 该指标的统计数据,并与公司微视部门的同事进行了沟通。然后简化了场景, Flink SQL 来 统计这些指标会更方便。

本方案结合当地自建 Kafka 腾讯云流集群计算 Oceanus(Flink)、云数据库 Redis 博客、购物等网站 UV、PV 实时可视化分析指标。分析指标包括网站独立访问者的数量(UV )、点击产品(PV)、转化率(转化率) = 成交次数 / 点击量)等。

相关概念介绍:UV(Unique Visitor):独立访客数量。访问您网站的客户端是访问者,如用户访问同一页面 5 第二,页面 UV 只加 1,因为 UV 统计数据是用户数量,而不是访问次数。PV(Page View):点击量或页面浏览量。如果用户访问同一页面 5 第二,页面 PV 会加 5。

d91fabcd58c6a7b8655eb3c920fe2fb1.png

根据上述实时指标统计场景,设计了以下架构图:

涉及产品列表:

  • 本地数据中心(IDC)的自建 Kafka 集群

  • 私有网络 VPC

  • 专线接入/云网/VPN连接/对等连接

  • 流计算 Oceanus (Flink)

  • 云数据库 Redis

购买所需的腾讯云资源,开放网络。 Kafka 集群需要根据集群所在区域采用 VPN 实现网络互专线连接或对等连接,实现网络互联。

私有网络(VPC)在腾讯云上构建自定义的逻辑隔离网络空间 Oceanus 集群、Redis 建议在组件和其他服务中选择相同的网络 VPC,网络可以互通。否则需要使用对等连接,NAT 网关、VPN 通过网络等方式。创建私有网络的步骤请参考帮助文档(https://cloud.tencent.com/document/product/215/36515)。

流计算 Oceanus 它是基于大数据产品生态系统的实时分析工具 Apache Flink 企业级实时大数据分析平台具有一站式开发、无缝连接、亚秒延迟、成本低、安全稳定等特点。流量计算 Oceanus 加快企业实时数字化建设进程,实现企业数据价值最大化。

在 Oceanus 控制台【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、设置初始密码设置初始密码等。VPC 和子网使用刚刚创建的网络。 Flink 集群如下:

在Redis 控制台(https://console.cloud.tencent.com/redis#/)页面创建集群,选择与其他组件同一区域、同一区域的同一私人网络 VPC,同一子网也在这里选择。

2.4.1 修改自建 Kafka 集群配置

自建 Kafka 集群连接时 bootstrap-servers 经常使用参数 hostname 而不是 ip 来连接。但用自建 Kafka 集群连接腾讯云 Oceanus 集群为全托管集群, Oceanus 自建集群的节点无法分析 hostname 与 ip 因此,监听器地址需要改变 hostname 为 ip 地址连接的形式。

将 config/server.properties 配置文件中 advertised.listeners 参数配置为IP地址。

# 0.10.X及以后版本 advertised.listeners=PLAINTEXT://10.1.0.10:9092 # 0.10.X之前版本 advertised.host.name=PLAINTEXT://10.1.0.10:9092

修改后重启 Kafka 集群。

! 如果在云上使用自建的,zookeeper还需要地址zk配置中的hostname修改IP地址形式。

2.4.2 模拟数据发送到topic

本案例使用topic为topic为 uvpv-demo。

1)Kafka 客户端

进入自建 Kafka 集群节点,启动 Kafka 模拟发送数据的客户端。

./bin/kafka-console-producer.sh--broker-list10.1.0.10:9092--topicuvpv-demo >{"record_type":0,"user_id":2,"client_ip":"100.0.0.2","product_id":101,"create_time":"2021-09-08 16:20:00"} >{"record_type":0,"user_id":3,"client_ip":"100.0.0.3","product_id":101,"create_time":"2021-09-08 16:20:00"} >{"record_type":1,"user_id":2,"client_ip":"100.0.0.1","product_id":101,"create_time":"2021-09-08 16:20:00"}

2)使用脚本发送

脚本一:Java 代码参考:https://cloud.tencent.com/document/product/597/54834

脚本二:Python 脚本。参考前一个案例。 python 适当修改脚本即可:

视频直播:实时数据可视化分析

自建 Kafka 腾讯云网络可通过以下方式集群连接 3 以各种方式打通自建 IDC 网络通信到腾讯云。

  • 专线接入

    https://cloud.tencent.com/document/product/216

    适用于本地数据中心 IDC 连接腾讯云网络。

  • 云联网

    https://cloud.tencent.com/document/product/877

    适用于本地数据中心 IDC 连接腾讯云网络,也可用于云上不同地区的私人网络 VPC 打通。

  • VPN连接

    https://cloud.tencent.com/document/product/554

    适用于本地数据中心 IDC 连接腾讯云网络。

  • 对等连接 NAT网关

    对等连接:https://cloud.tencent.com/document/product/553

    NAT网关:https://cloud.tencent.com/document/product/552

    适用于云上不同地区的私有网络 VPC 不适合本地 IDC 到腾讯云网络。

本方案已使用 VPN 连接方式,实现本地化 IDC 与云网络通信。参考链接:

建立 VPC 到 IDC 连接(路由表)(https://cloud.tencent.com/document/product/554/52854)

根据方案绘制以下网络架构图:

利用流计算 Oceanus 实现网站 UV、PV、转化率指标的实时统计只列出以下三个统计指标:

  • 独立访客数量的网站 UV。Oceanus 处理后在 Redis 中通过 set 类型存储独立访客的数量,也达到了重视同一访客数据的目的。

  • 网站商品页面的点击量 PV。Oceanus 处理后在 Redis 中使 list 类型存储页面点击量。

  • 转化率(转化率 = 成交次数 / 点击量)。Oceanus 处理后在 Redis 中用 String 存储即可。

Kafka topic:uvpv-demo(浏览记录)

record_type

int

客户号

user_id

varchar

客户ip地址

client_ip

varchar

房间号

product_id

Int

进入房间时间

create_time

timestamp

创建时间

Kafka 内部采用 json 格式存储,数据格式如下:

# 浏览记录
{
 "record_type":0,  # 0 表示浏览记录
 "user_id": 6,
 "client_ip": "100.0.0.6",
 "product_id": 101,
 "create_time": "2021-09-06 16:00:00"
}

# 购买记录
{
 "record_type":1, # 1 表示购买记录
 "user_id": 6,
 "client_ip": "100.0.0.8",
 "product_id": 101,
 "create_time": "2021-09-08 18:00:00"
}

示例中实现了 UV、PV 和转化率3个指标的获取逻辑,并写入 Sink 端。

1、定义 Source

CREATE TABLE `input_web_record` (
  `record_type` INT,
  `user_id` INT,
  `client_ip` VARCHAR,
  `product_id` INT,
  `create_time` TIMESTAMP,
  `times` AS create_time,
  WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
    'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
    'topic' = 'uvpv-demo',  
    'scan.startup.mode' = 'earliest-offset',
    --'properties.bootstrap.servers' = '82.157.27.147:9092',
    'properties.bootstrap.servers' = '10.1.0.10:9092',  
    'properties.group.id' = 'WebRecordGroup',  -- 必选参数, 一定要指定 Group ID
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
    'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);

2、定义 Sink

-- UV sink
CREATE TABLE `output_uv` (  
`userids`   STRING,
`user_id` STRING
) WITH (
 'connector' = 'redis',          
 'command' = 'sadd',              -- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',  -- redis连接地址,集群模式多个节点使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必须设置。
 'password' = 'yourpassword'  
);

-- PV sink
CREATE TABLE `output_pv` (  
`pagevisits`   STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
 'connector' = 'redis',          
 'command' = 'lpush',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',   -- redis连接地址,集群模式多个节点使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必须设置。
 'password' = 'yourpassword'  
);

-- 转化率 sink
CREATE TABLE `output_conversion_rate` (  
`conversion_rate`   STRING,
`rate` STRING
) WITH (
 'connector' = 'redis',        
 'command' = 'set',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379', -- redis连接地址,集群模式多个节点使用'',''分隔。
 -- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必须设置。
 'password' = 'yourpassword'  
);

3、业务逻辑

-- 加工得到 UV 指标,统计所有时间内的 UV
INSERT INTO output_uv
SELECT
 'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;

-- 加工并得到 PV 指标,统计每 10 分钟内的 PV
INSERT INTO output_pv
SELECT
 'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;

-- 加工并得到转化率指标,统计每 10 分钟内的转化率
INSERT INTO output_conversion_rate
SELECT
 'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY  
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;

通常情况,会通过 Web 网站来展示统计到的 UV、PV 指标,这里为了简单直接在Redis 控制台(https://console.cloud.tencent.com/redis#/)登录进行查询:

userids: 存储 UV

pagevisits: 存储 PV

conversion_rate: 存储转化率,即购买商品次数/总页面点击量。

通过自建 Kafka 集群采集数据,在流计算 Oceanus (Flink) 中实时进行字段累加、窗口聚合等操作,将加工后的数据存储在云数据库Redis,统计到实时刷新的 UV、PV 等指标。这个方案在 Kafka json 格式设计时为了简便易懂做了简化处理,将浏览记录和产品购买记录都放在了同一个 topic 中,重点通过打通自建 IDC 和腾讯云产品间的网络来展现整个方案。针对超大规模的 UV 去重,微视的同事采用了 Redis hyperloglog 方式来实现 UV 统计。相比直接使用 set 类型方式有极小的内存空间占用的优点,详情见链接:https://cloud.tencent.com/developer/article/1889162。

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码关注我们

标签: uv连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台