大数据之Spark完全使用案例实践
- 一、案例一
-
- 1、准备数据
- 2、需求 1:Top10 热门品类
- 3、需求说明
-
- 方案一、
- 实现方案二
- 实现方案三
- 二 、需求实现
-
- 1、需求 2:Top10 热门品类中每个品类的每个热门品类 Top10 活跃 Session 统计
- 2.页面单跳转换率统计
- 三、工程代码三层架构
-
- 1、三层介绍
- 2、结构图
- 3、代码
一、案例一
1、准备数据
以上数据图是从数据文件中截取的一部分,表示为电子商务网站的用户行为数据,主要包括用户 4 行为:搜索、点击、下单、付款。数据规则如下:
? 每行数据在数据文件中使用下划线分隔数据
? 每一行数据表示用户的行为,只能是 4 一种行为
? 若搜索关键字为 null,数据不是搜索数据
? 假如点击类别 ID 和产品 ID 为-1,表示数据不是点击数据
? 对于订单行为,您可以一次订购多个商品,因此类别 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
? 支付行为与订单行为类似
详细的字段说明
编号 | 字段名称 | 字段类型 | 字段含义 |
---|---|---|---|
1 | date | String | 用户点击行为日期 |
2 | user_id | Long | 用户的 ID |
3 | session_id | String | Session 的 ID |
4 | page_id | Long | 某个页面的 ID |
5 | action_time | String | 动作时间点 |
6 | search_keyword | String | 用户搜索的关键字 |
7 | click_category_id | Long | 某一商品类别 ID |
8 | click_product_id | Long | 商品 ID |
9 | order_category_ids |
String | 一次订单中所有品类的 ID 集合 |
10 | order_product_ids |
String | 订单中所有商品的订单 ID 集合 |
11 | pay_category_ids |
String | 一次性支付中所有类别的所有类别 ID 集合 |
12 | pay_product_ids |
String | 一次性支付中所有商品的所有商品 ID 集合 |
13 | city_id | Long | 城市 id |
///用户访问动作表 case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Long,//用户的 ID session_id: String,//Session 的 ID page_id: Long,///页面 ID action_time: String,//动作时间点 search_keyword: String,////用户搜索的关键词 click_category_id: Long,/////某一商品类别 ID click_product_id: Long,///商品 ID order_category_ids: String,//一次单中所有品类的 ID 集合
order_product_ids: String,//一次订单中所有商品的 ID 集合
pay_category_ids: String,//一次支付中所有品类的 ID 集合
pay_product_ids: String,//一次支付中所有商品的 ID 集合
city_id: Long
)//城市 id
2、需求 1:Top10 热门品类
3、需求说明
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的 公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数 衣服 点击数 下单数 支付数 电脑 点击数 下单数 支付数 例如,综合排名 = 点击数20%+下单数30%+支付数*50% 本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。
方案一、
分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
package com.spack.bigdata.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{
SparkConf, SparkContext}
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
/** * * TODO 热门类品类 */
val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(operator)
//TODO 1、读取原始日志数据
val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
//TODO 2、统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
//点击品类的ID就有了、数量就是1--(单独统计点击的品类)
(datas(6), 1)
}
).reduceByKey(_ + _)
//TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null
val orderCountRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(8) != "null"
}
)
val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// value".collect().foreach(println)
//TODO 4、统计品类的支付数量:(品类ID,支付数量)
val payCountRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(10) != "null"
}
)
val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
//TODO 5、将品类进行排序,并且提取前十名
//点击数量排序、下单数量排序,支付数量排序
//元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
//(品类ID,(点击数量,下单数量,支付数量))
//会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
//cogroup = connect + group
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCount, payCount)
val analysisRDD = cogroupRDD.mapValues {
case (clickIter, orderIter, payIter) => {
var clickCnt = 0
val iter1 = clickIter.iterator
if (iter1.hasNext) {
clickCnt = iter1.next()
}
var orderCnt = 0
val iter2 = orderIter.iterator
if (iter2.hasNext) {
orderCnt = iter2.next()
}
var payCnt = 0
val iter3 = payIter.iterator
if (iter3.hasNext) {
payCnt = iter3.next()
}
(clickCnt, orderCnt, payCnt)
}
}
val tuples = analysisRDD.sortBy(_._2, false).take(10)
tuples.foreach(println)
//TODO 6、将结果采集到控制台打印出来
sc.stop()
//TODO 7、统计品类的点击数量:(品类ID,点击数量)
}
}
结果: (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161))
实现方案二
一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
package com.spack.bigdata.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{
SparkConf, SparkContext}
/** * 第二种实现方式 */
object Spark02_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
/** * * TODO 热门类品类 */
val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(operator)
//Q: actionRdd重复使用 -使用缓存
//Q: cogroup性能可能较低
//TODO 1、读取原始日志数据
val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
actionRdd.cache()
//TODO 2、统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
//点击品类的ID就有了、数量就是1--(单独统计点击的品类)
(datas(6), 1)
}
).reduceByKey(_ + _)
//TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null
val orderCountRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(8) != "null"
}
)
val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// value".collect().foreach(println)
//TODO 4、统计品类的支付数量:(品类ID,支付数量)
val payCountRDD = actionRdd.filter(
action => {
val datas = action.split("_")
//获取索引6的、去除不是-1的数据
datas(10) != "null"
}
)
val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
//TODO 5、将品类进行排序,并且提取前十名
//点击数量排序、下单数量排序,支付数量排序
//元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
//(品类ID,(点击数量,下单数量,支付数量))
//会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
//cogroup = connect + group
val rdd = clickCountRDD.map {
case (cid, cnt) => {
(cid, (cnt, 0, 0))
}
}
val rdd1 = orderCount.map {
case (cid, cnt) => {
(cid, (0, cnt, 0))
}
}
val rdd2 = payCount.map {
case (cid, cnt) => {
(cid, (0, 0, cnt))
}
}
//将三个数据源合并在一起、统一进行聚合计算
val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd.union(rdd1).union(rdd2)
val analysisRDD = sourceRDD.reduceByKey {
(t1, t2) => {
(t1._1+t2._1,t1._2+ t2._2,t1._3+ t2._3)
}
}
// sourceRDD.collect().foreach(println)
val tuples = analysisRDD.sortBy(_._2, false).take(10)
tuples.foreach(println)
//TODO 6、将结果采集到控制台打印出来
sc.stop()
//TODO 7、统计品类的点击数量:(品类ID,点击数量)
}
}
结果: (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161))
Process finished with exit code 0
实现方案三
使用累加器的方式聚合数据
package com.spack.bigdata.core.req import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{ SparkConf, SparkContext} import scala.collection.mutable /** * 使用累加器的方式聚合数据 * */ object Spark04_Req1_HotCategoryTop10Analysis { def main(args: Array[String]): Unit = { /** * * TODO 热门类品类 */ val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(operator) //TODO 1、读取原始日志数据 val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt") val acc = new HotCategoryAccumulator sc.register(acc, "HotCategory") //将数据转换结构 val flatMapRDD = actionRdd.foreach( action => { val datas = action.split("_") if (datas(6) != "-1") { //点击的场合 acc.add(datas(6), "click") } else if (datas(8) != "null") { //下单的场合 val ids = datas(8).split(",") ids.foreach( id => { acc.add(id, "order") } ) } else if (datas(10) != "null") { //支付的场合 val ids = datas(10).split(",") ids.foreach( id => { acc.add(id, "pay") } ) } } ) val accVal: mutable.Map[String, HotCategory] = acc.value val categories: mutable.Iterable[HotCategory] = accVal.map(_._2) val sort = categories.toList.sortWith( (left, right) => { if (left.clickCnt > right.clickCnt) { true } else if (left.clickCnt == right.clickCnt) { if (left.orderCnt > right.orderCnt) { true } else if (left.orderCnt == right.orderCnt) { left.payCnt > right.payCnt } else { false } } else { false } } ) sort.take(10).foreach(println) //TODO 6、将结果采集到控制台打印出来 sc.stop() //TODO 7、统计品类的点击数量:(品类ID,点击数量) } case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int) /** *
自定义累加器 * 1、继承AccumlatorV2,定义泛型 * IN :(品类ID,行为类型) * OUT: mutable.Map[String,HotCategory] * * 2、重写方法(6) */ class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] { private val hcMap = mutable.Map[String, HotCategory]() //是不是当前的初始状态 override def isZero: Boolean = { hcMap.isEmpty } override def copy(): AccumulatorV2[(String, String), mutable.Map[String,