基于Spark设计和实现电子商务用户行为分析系统
项目架构
Flume–>Kafka–>Spark Streaming–>Mysql–>FineReport 10
数据可视化使用第三方软件FineReport支持
1. 数据采集:利用Java被监控的文件中写入线程模拟行为数据
模拟电子商务网站用户行为数据(或阿里云天池开源数据集:淘宝或天猫用户行为数据)
user_id 用户ID,序列化后的整数类型和用户ID item_id 商品ID,整数类型,序列化商品ID category_id 商品类目ID,整数类型,序列化商品所属类目ID behavior_type 行为类型、字符串、枚举类型包括('pv','buy', 'cart','fav') ddate 行为发生的时间
- flume实时监控数据文件,收集和传输新的用户行为数据Kafka
启动flume:
flume-ng agent -c ./softwares/flume/conf/ -f ./flume2kafka.conf -n a1
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # 这实际上是监控Linux命令的source,该命令是输出监控文件和实时变更的内容 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/admin/bigdata.log # 设置kafka接收器 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 设置kafka的broker地址和端口号 a1.sinks.k1.brokerList=172.17.33.37:9092 # 设置Kafka的topic a1.sinks.k1.topic=henry # 设置序列化 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder # use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
- Kafka接收数据:将flume将传输的数据放入消息队列,等待spark streaming 消费 启动zookeeper
zkServer.sh start
启动Kafka(-deamon后台启动)
kafka-server-start.sh -daemon ./softwares/kafka/config/server.properties
创建topic(3个分区,1个副本,topic名为henry)
kafka-topics.sh --create --zookeeper 172.17.33.37:2181 --replication-factor 1 --partitions 3 --topic henry #查看所有topic kafka-topics.sh --list --zookeeper 172.17.33.37:2181 #定义一个Kafka生产者端口 kafka-console-producer.sh --broker-list 172.17.33.37:9092 --topic henry #定义一个Kafka消费者端口 kafka-console-consumer.sh --bootstrap-server 172.17.33.37:9092 --topic henry
数据采集部分已经准备好完成。这里没有数据清理,而是将所有上游数据传输到数据处理模块,统一处理
2. 数据处理
- spark streaming 程序为 实时等待流,从Kafka指定的topic中消费数据
UserBehavior.scala
/** * spark streaming实时处理kafka终端数据并持MySQL * * @author huangxuan * @date 2021.03.19 */ object UserBehavior { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer") //sc val sc = new SparkConf().setAppName("To MySql").setMaster("local[5]") //ssc 每5秒一批 val ssc = new StreamingContext(sc,Seconds(5)) //从Kafka中读取数据 val topics = "henry" //topic name val numThreads = 3 //每个topic的分区数 val topicMap = topics.split(" ").map((_,numThreads.toInt)).toMap val kafkaStream = KafkaUtils.createStream( ssc, // spark streaming 上下文对象 "172.17.33.37:2181", //zookeeper 集群 "test1", //topic 所在组 topicMap ) ///获得连接对象 val conn = ConnectionPool.getConnection val stmt = conn.createStatement //spark streaming 消费数据 val lines = kafkaStream.map(_._2).flatMap(_.split("\n")) lines.foreachRDD(line=>{ //将kafka数据由行为单位转换为字符串集合 // (1,1,pv),(1,2,pv) val datas:Array[String] = line.collect() datas.foreach(x=>{ val parms = x.split(",") val sql = "insert into user_behavior values(" parms(0) "," parms(1) ",\'" parms(2) "\');" stmt.executeUpdate(sql) println(sql) }) }) ConnectionPool.returnConnection(conn) ssc.start //实时等待流量 ssc.awaitTermination } }
jar将包上传到服务器并执行spark streaming程序
spark-submit --class "saprkStreaming.KafkaWordCounter" --master local[2] Spark_Study-1.0-SNAPSHOT.jar
3.数据分析
- 本次分析中使用的模型和系统
电子商务分析通常从流程效率分析、流量/用户分析、商品分析、产品分析四个方面进行,通过用户粘性、价值、满意度分析,通过商品生命周期和相关分析划分商品等级,通过产品分析提高用户浏览-购买过程体验;
本项目采用常用的电子商务数据分析指标AARRR漏斗模型拆解用户进入APP以后的每一步行为;并使用RFM模型,评估用户价值,找到最有价值的用户群,为这部分用户进行差异化营销。
用户: 年龄:不同年龄段的购物需求 区域:不同地区的购物需求 性别:不同性别的购物需求 以及其他组合分析 商品: 类型: 价格:
4. 数据库设计
- 采用MySQL 8.0.23 作为spark streaming下游数据持久层
- 用户信息表:
create table user_info(
u_id int primary key COMMENT '用户ID',
u_name varchar(50) COMMENT '用户姓名',
u_sex varchar(10) COMMENT '用户性别',
u_province varchar(50) COMMENT '用户所在省',
u_city varchar(50) COMMENT '用户所在市',
u_birthday date COMMENT '用户出生日期'
)character set = utf8;
insert into user_info values(1,'张三','男','北京','北京','1999-12-26');
insert into user_info values(2,'李四','男','湖北','武汉','1979-04-21');
insert into user_info values(3,'王五','男','河南','郑州','1984-10-16');
insert into user_info values(4,'李大壮','男','山东','青岛','2002-09-09');
insert into user_info values(5,'肖站','男','江西','南昌','1968-11-06');
insert into user_info values(6,'王伊勃','男','新疆','乌鲁木齐','1996-07-09');
insert into user_info values(7,'王冰冰','女','辽宁','长春','1990-09-28');
insert into user_info values(8,'李冰冰','女','黑龙江','哈尔滨','1983-04-11');
insert into user_info values(9,'范冰冰','女','陕西','西安','1977-12-20');
insert into user_info values(10,'安吉拉','女','上海','上海','2002-03-23');
insert into user_info values(11,'罗玉凤','女','海南','海口','1965-03-08');
insert into user_info values(12,'肖龙女','女','四川','成都','1992-05-06');
insert into user_info values(13,'洛雪梅','女','云南','昆明','1983-01-06');
insert into user_info values(14,'马冬梅','女','湖南','长沙','1974-11-06');
insert into user_info values(15,'苏妲己','女','贵州','贵阳','1974-11-06');
insert into user_info values(16,'王建国','男','重庆','重庆','1999-11-06');
insert into user_info values(17,'李旦','男','内蒙古','呼和浩特','1968-11-06');
insert into user_info values(18,'张晓飞','男','吉林','长春','1968-11-06');
insert into user_info values(19,'古力娜扎','女','福建','福州','1994-01-06');
insert into user_info values(20,'允允朴','男','浙江','杭州','2006-11-06');
- 商品信息表:
create table goods_info( g_id int primary key COMMENT '商品ID', g_name varchar(50) COMMENT '商品名称', g_type int COMMENT '商品类型编号', g_price float COMMENT '商品价格' )character set = utf8; insert into goods_info values(1,'安踏运动鞋',1,198); insert into goods_info values(2,'李宁板鞋',1,214.99); insert into goods_info values(3,'李宁韦德之道9篮球鞋',1,1499); insert into goods_info values(4,'Jordan印花套头连帽衫 黑色',1,189); insert into goods_info values(5,'遮阳帽',1,49); insert into goods_info values(6,'牛仔裤',1,324); insert into goods_info values(7,'休闲衬衫',1,88.88); insert into goods_info values(8,'皮鞋',1,499); insert into goods_info values(9,'T恤',1,79); insert into goods_info values(10,'潮流工装裤',1,209); insert into goods_info values(11,'AD钙奶',2,45); insert into goods_info values(12,'手撕面包',2,36); insert into goods_info values(13,'卫龙辣条',2,8); insert into goods_info values(14,'螺蛳粉',2,66.66); insert into goods_info values(15,'可乐',2,6); insert into goods_info values(16,'蛋黄酥',2,28); insert into goods_info values(17,'自热火锅',2,60); insert into goods_info values(18,'大益普洱茶',2,999); insert into goods_info values(19,'飞天茅台酱香型',2,1499); insert into goods_info values(20,'零食大礼包',2,120); insert into goods_info values(21,'华为Mate40 Pro',3,6499); insert into goods_info values(22,'小米11',3,3999); insert into goods_info values(23,'自拍杆',3,30); insert into goods_info values(24,'数据线',3,25); insert into goods_info values(25,'充电宝',3,120); insert into goods_info values(26,'平板电脑',3,4299); insert into goods_info values(27,'无人机',3,4999); insert into goods_info values(28,'游戏机',3,2549); insert into goods_info values(29,'笔记本电脑',3,9999); insert into goods_info values(30,'数码
相机',3,43800); insert into goods_info values(31,'小米电视机',4,1499); insert into goods_info values(32,'美的洗衣机',4,729); insert into goods_info values(33,'电吹风',4,59); insert into goods_info values(34,'电饭煲',4,159); insert into goods_info values(35,'热水器',4,569); insert into goods_info values(36,'空调',4,2399); insert into goods_info values(37,'微波炉',4,489); insert into goods_info values(38,'净水器',4,259); insert into goods_info values(39,'咖啡机',4,4690); insert into goods_info values(40,'电扇',4,42); insert into goods_info values(41,'纸尿裤',5,99); insert into goods_info values(42,'奶瓶',5,39); insert into goods_info values(43,'益智玩具',5,26); insert into goods_info values(44,'毛绒抱枕',5,35); insert into goods_info values(45,'涂色绘画',5,31.5); insert into goods_info values(46,'手办',5,199); insert into goods_info values(47,'儿童肚兜',5,19.9); insert into goods_info values(48,'儿童牙刷',5,14.9); insert into goods_info values(49,'儿童餐椅',5,119); insert into goods_info values(50,'妈咪包',5,60); insert into goods_info values(51,'行李包',6,44.5); insert into goods_info values(52,'女生斜挎包',6,139); insert into goods_info values(53,'双肩包',6,69.6); insert into goods_info values(54,'链条包',6,469); insert into goods_info values(55,'旅行箱',6,334); insert into goods_info values(56,'帆布包',6,62); insert into goods_info values(57,'单肩包',6,199); insert into goods_info values(58,'男士胸包',6,62); insert into goods_info values(59,'男士真皮包',6,439); insert into goods_info values(60,'卡包',6,54); insert into goods_info values(61,'口红',7,319); insert into goods_info values(62,'面膜',7,99); insert into goods_info values(63,'雅思兰黛粉底',7,410); insert into goods_info values(64,'OLAY抗老精华',7,229); insert into goods_info values(65,'大宝眼霜',7,99.9); insert into goods_info values(66,'安耐晒',7,79); insert into goods_info values(67,'清扬男士洗发水',7,69.9); insert into goods_info values(68,'舒肤佳沐浴露',7,34.9); insert into goods_info values(69,'发胶',7,69); insert into goods_info values(70,'香皂',7,10); insert into goods_info values(71,'六味地黄丸',8,30); insert into goods_info values(72,'皮炎宁',8,44); insert into goods_info values(73,'999感冒灵',8,99); insert into goods_info values(74,'九芝堂阿胶补血颗粒',8,436); insert into goods_info values(75,'维C含片',8,34); insert into goods_info values(76,'珍视明滴眼液',8,42); insert into goods_info values(77,'汤臣倍健复合维A',8,144); insert into goods_info values(78,'云南白药创可贴',8,21); insert into goods_info values(79,'万通筋骨贴',8,29); insert into goods_info values(80,'安眠药',8,169); insert into goods_info values(81,'鞋架',9,9.8); insert into goods_info values(82,'衣架',9,46); insert into goods_info values(83,'折叠床',9,129); insert into goods_info values(84,'真皮沙发',9,16999); insert into goods_info values(85,'浴缸',9,420); insert into goods_info values(86,'台灯',9,66); insert into goods_info values(87,'椅子',9,44); insert into goods_info values(88,'桌子',9,130); insert into goods_info values(89,'傲风电竞椅',9,444); insert into goods_info values(90,'床垫',9,239); insert into goods_info values(91,'平衡车',10,1629); insert into goods_info values(92,'路亚套杆',10,429); insert into goods_info values(93,'跑步机',10,599); insert into goods_info values(94,'哑铃套件',10,236); insert into goods_info values(95,'健腹轮',10,29); insert into goods_info values(96,'呼啦圈',10,76); insert into goods_info values(97,'轮滑鞋',10,232); insert into goods_info values(98,'篮球',10,199); insert into goods_info values(99,'羽毛球拍',10,142); insert into goods_info values(100,'网球拍',10,64);
- 商品类目表:
create table goods_type(
t_id int primary key COMMENT '商品类型ID',
t_name varchar(50) COMMENT '商品类型名称'
)character set = utf8;
insert into goods_type values(1,'服饰鞋帽');
insert into goods_type values(2,'食品饮料');
insert into goods_type values(3,'数码3C');
insert into goods_type values(4,'家用电器');
insert into goods_type values(5,'母婴');
insert into goods_type values(6,'箱包');
insert into goods_type values(7,'个护美妆');
insert into goods_type values(8,'医药');
insert into goods_type values(9,'家具');
insert into goods_type values(10,'体育器械');
- 行为类目表:
create table behavior_type(
b_id varchar(5) primary key COMMENT '行为类型ID',
b_name varchar(50) COMMENT '行为类型名称'
)character set = utf8;
insert into behavior_type values('pv','点击');
insert into behavior_type values('buy','购买');
insert into behavior_type values('cart','加购物车');
insert into behavior_type values('fav','收藏');
- 用户行为数据表:来源spark streaming
create table user_behavior(
user_id int COMMENT '用户ID',
good_id int COMMENT '商品ID',
behavior_type varchar(5) COMMENT '行为类型ID',
ddate date COMMENT '行为时间'
)character set = utf8;
- 系统用户表:
create table sys_user(
userName varchar(50) primary key COMMENT '用户名',
userPassword varchar(50) COMMENT '密码',
userEmail varchar(50) COMMENT '邮箱',
userIdentity int COMMENT '用户身份,0-超级管理员,1-普通用户'
)character set = utf8;
insert into sys_user values('henry','henry','1332822653@qq.com',0);
insert into sys_user values('tom','tom','9999999999@163.com',1);
insert into sys_user values('jack','jack','helloworld@163.com',1);