资讯详情

大数据之Spark案例实操完整使用(第六章)

大数据之Spark完全使用案例实践

  • 一、案例一
    • 1、准备数据
    • 2、需求 1:Top10 热门品类
    • 3、需求说明
      • 方案一、
      • 实现方案二
      • 实现方案三
  • 二 、需求实现
    • 1、需求 2:Top10 热门品类中每个品类的每个热门品类 Top10 活跃 Session 统计
    • 2.页面单跳转换率统计
  • 三、工程代码三层架构
    • 1、三层介绍
    • 2、结构图
    • 3、代码

一、案例一

1、准备数据

在这里插入图片描述 以上数据图是从数据文件中截取的一部分,表示为电子商务网站的用户行为数据,主要包括用户 4 行为:搜索、点击、下单、付款。数据规则如下:

? 每行数据在数据文件中使用下划线分隔数据

? 每一行数据表示用户的行为,只能是 4 一种行为

? 若搜索关键字为 null,数据不是搜索数据

? 假如点击类别 ID 和产品 ID 为-1,表示数据不是点击数据

? 对于订单行为,您可以一次订购多个商品,因此类别 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

? 支付行为与订单行为类似

详细的字段说明

编号 字段名称 字段类型 字段含义
1 date String 用户点击行为日期
2 user_id Long 用户的 ID
3 session_id String Session 的 ID
4 page_id Long 某个页面的 ID
5 action_time String 动作时间点
6 search_keyword String 用户搜索的关键字
7 click_category_id Long 某一商品类别 ID
8 click_product_id Long 商品 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 订单中所有商品的订单 ID 集合
11 pay_category_ids String 一次性支付中所有类别的所有类别 ID 集合
12 pay_product_ids String 一次性支付中所有商品的所有商品 ID 集合
13 city_id Long 城市 id
///用户访问动作表 case class UserVisitAction(  date: String,//用户点击行为的日期  user_id: Long,//用户的 ID  session_id: String,//Session 的 ID  page_id: Long,///页面 ID  action_time: String,//动作时间点  search_keyword: String,////用户搜索的关键词  click_category_id: Long,/////某一商品类别 ID  click_product_id: Long,///商品 ID  order_category_ids: String,//一次单中所有品类的 ID 集合
 order_product_ids: String,//一次订单中所有商品的 ID 集合
 pay_category_ids: String,//一次支付中所有品类的 ID 集合
 pay_product_ids: String,//一次支付中所有商品的 ID 集合
 city_id: Long
)//城市 id

2、需求 1:Top10 热门品类

3、需求说明

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的 公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

鞋 点击数 下单数 支付数 衣服 点击数 下单数 支付数 电脑 点击数 下单数 支付数 例如,综合排名 = 点击数20%+下单数30%+支付数*50% 本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。

方案一、

分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{ 
        SparkConf, SparkContext}

object Spark01_Req1_HotCategoryTop10Analysis { 
        

  def main(args: Array[String]): Unit = { 
        
    /** * * TODO 热门类品类 */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)


    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    //TODO 2、统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(6) != "-1"

      }
    )


    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => { 
        
        val datas = action.split("_")
        //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null

    val orderCountRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(8) != "null"

      }
    )

    val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
      action => { 
        
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))

      }
    ).reduceByKey(_ + _)

    // value".collect().foreach(println)


    //TODO 4、统计品类的支付数量:(品类ID,支付数量)

    val payCountRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(10) != "null"

      }
    )

    val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
      action => { 
        
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)



    //TODO 5、将品类进行排序,并且提取前十名
    //点击数量排序、下单数量排序,支付数量排序
    //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
    //(品类ID,(点击数量,下单数量,支付数量))

    //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
    //cogroup = connect + group

    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCount, payCount)

    val analysisRDD = cogroupRDD.mapValues { 
        
      case (clickIter, orderIter, payIter) => { 
        
        var clickCnt = 0
        val iter1 = clickIter.iterator
        if (iter1.hasNext) { 
        
          clickCnt = iter1.next()
        }

        var orderCnt = 0
        val iter2 = orderIter.iterator
        if (iter2.hasNext) { 
        
          orderCnt = iter2.next()
        }

        var payCnt = 0
        val iter3 = payIter.iterator
        if (iter3.hasNext) { 
        
          payCnt = iter3.next()
        }
        (clickCnt, orderCnt, payCnt)
      }
    }

    val tuples = analysisRDD.sortBy(_._2, false).take(10)
    tuples.foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()

    //TODO 7、统计品类的点击数量:(品类ID,点击数量)

  }

}

结果: (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161))

实现方案二

一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{ 
        SparkConf, SparkContext}


/** * 第二种实现方式 */
object Spark02_Req1_HotCategoryTop10Analysis { 
        

  def main(args: Array[String]): Unit = { 
        
    /** * * TODO 热门类品类 */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)
    //Q: actionRdd重复使用 -使用缓存
    //Q: cogroup性能可能较低

    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    actionRdd.cache()

    //TODO 2、统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(6) != "-1"
      }
    )

    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => { 
        
        val datas = action.split("_")
        //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null

    val orderCountRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(8) != "null"

      }
    )

    val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
      action => { 
        
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))

      }
    ).reduceByKey(_ + _)

    // value".collect().foreach(println)

    //TODO 4、统计品类的支付数量:(品类ID,支付数量)

    val payCountRDD = actionRdd.filter(
      action => { 
        
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(10) != "null"

      }
    )

    val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
      action => { 
        
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    //TODO 5、将品类进行排序,并且提取前十名
    //点击数量排序、下单数量排序,支付数量排序
    //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
    //(品类ID,(点击数量,下单数量,支付数量))

    //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
    //cogroup = connect + group

    val rdd = clickCountRDD.map { 
        
      case (cid, cnt) => { 
        
        (cid, (cnt, 0, 0))
      }
    }

    val rdd1 = orderCount.map { 
        
      case (cid, cnt) => { 
        
        (cid, (0, cnt, 0))
      }
    }
    val rdd2 = payCount.map { 
        
      case (cid, cnt) => { 
        
        (cid, (0, 0, cnt))
      }
    }

    //将三个数据源合并在一起、统一进行聚合计算
    val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd.union(rdd1).union(rdd2)
    val analysisRDD = sourceRDD.reduceByKey { 
        
      (t1, t2) => { 
        
        (t1._1+t2._1,t1._2+ t2._2,t1._3+ t2._3)
      }
    }
// sourceRDD.collect().foreach(println)
        val tuples = analysisRDD.sortBy(_._2, false).take(10)

    tuples.foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()
    //TODO 7、统计品类的点击数量:(品类ID,点击数量)

  }

}

结果: (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161))

Process finished with exit code 0

实现方案三

使用累加器的方式聚合数据

package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{ 
        SparkConf, SparkContext}

import scala.collection.mutable


/** * 使用累加器的方式聚合数据 * */
object Spark04_Req1_HotCategoryTop10Analysis { 
        
  def main(args: Array[String]): Unit = { 
        
    /** * * TODO 热门类品类 */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)


    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    val acc = new HotCategoryAccumulator
    sc.register(acc, "HotCategory")


    //将数据转换结构
    val flatMapRDD = actionRdd.foreach(
      action => { 
        
        val datas = action.split("_")
        if (datas(6) != "-1") { 
        
          //点击的场合
          acc.add(datas(6), "click")
        } else if (datas(8) != "null") { 
        
          //下单的场合
          val ids = datas(8).split(",")
          ids.foreach(
            id => { 
        
              acc.add(id, "order")
            }
          )

        } else if (datas(10) != "null") { 
        
          //支付的场合
          val ids = datas(10).split(",")
          ids.foreach(
            id => { 
        
              acc.add(id, "pay")
            }
          )
        }
      }
    )


    val accVal: mutable.Map[String, HotCategory] = acc.value
    val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)


    val sort = categories.toList.sortWith(
      (left, right) => { 
        
        if (left.clickCnt > right.clickCnt) { 
        
          true
        } else if (left.clickCnt == right.clickCnt) { 
        
          if (left.orderCnt > right.orderCnt) { 
        
            true
          } else if (left.orderCnt == right.orderCnt) { 
        
            left.payCnt > right.payCnt
          } else { 
        
            false
          }
        } else { 
        
          false
        }
      }
    )


    sort.take(10).foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()
    //TODO 7、统计品类的点击数量:(品类ID,点击数量)
  }


  case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  /** * 自定义累加器 * 1、继承AccumlatorV2,定义泛型 * IN :(品类ID,行为类型) * OUT: mutable.Map[String,HotCategory] * * 2、重写方法(6) */
  class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] { 
        


    private val hcMap = mutable.Map[String, HotCategory]()

    //是不是当前的初始状态
    override def isZero: Boolean = { 
        
      hcMap.isEmpty
    }


    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, 

标签: 6074连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台