资讯详情

Spark-核心编程(七)Spark案例实操即工程化代码

Spark案例实操

数据如下:

在这里插入图片描述

数据分析如下:

# 以第一行为例 2019-07-17  日期 95  用户ID 26070e87-1ad7-49a3-8fb3-cc741facaddf  sessionID 37  页面ID 2019-07-17 00:00:02  动作时间 手机  搜索-关键字,假如这个字段null说明目前是搜索操作 -1    点击-品类ID,假如这个字段-1说明当前操作是点击 -1    点击-产品ID,假如这个字段-1说明当前操作是点击 null  下单-品类ID,假如这个字段null说明目前的操作是下单操作,多个ID用,隔开 null  下单-产品ID,假如这个字段null说明目前的操作是下单操作,多个ID用,隔开 null  支付-品类ID,假如这个字段null说明当前操作是支付操作,多个ID用,隔开 null  支付-产品ID,假如这个字段null说明当前操作是支付操作,多个ID用,隔开 3     城市id 

以上数据图是从数据文件中截取的一部分,表示为电子商务网站的用户行为数据,主要包括用户 4 行为:搜索、点击、下单、付款。数据规则如下:

每行数据在数据文件中使用下划线分隔数据

每一行数据表示用户的一次行为,这个行为只能是 4 一种行为

若搜索关键字为 null,数据不是搜索数据

假如点击类别 ID 和产品 ID 为-1,表示数据不是点击数据

对于订单行为,您可以一次订购多个商品,因此类别 ID 和产品 ID 可以是多个,id 逗号分隔在两者之间。如果这不是订单行为,数据将被使用 null 表示

支付行为与订单行为类似

编号 字段名称 字段类型 字段含义
1 date String 用户点击行为日期
2 user_id Long 用户的 ID
3 session_id String Session 的 ID
4 page_id Long 某个页面的 ID
5 action_time String 动作时间点
6 search_keyword String 用户搜索的关键字
7 click_category_id Long 某一商品类别 ID
8 click_product_id Long 商品 ID
9 order_category_ids String 订单中所有类别的订单 ID 集合
10 order_product_ids String 订单中所有商品的订单 ID 集合
11 pay_category_ids String 一次性支付中所有类别的所有类别 ID 集合
12 pay_product_ids String 一次性支付中所有商品的所有商品 ID 集合
13 city_id Long 城市 id

需求一:TOP10热门品类

类别是指产品的分类。大型电子商务网站的类别分为多个层次。我们的项目只有一个类别。不同的公司可能对流行程度有不同的定义。我们根据每个类别的点击、订单和支付来统计流行类别。

鞋 点击数 下单数 支付数

例如, 综合排名 = 点击数20% 下单数30% 支付数*50%

本项目需求优化为:

第一种实现方法:

object TestHostCategoryTop10T1 { 
              def main(args: Array[String]): Unit = { 
                 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")         val sc = new SparkContext(sparkConf)          // TOP10热门品类         // 1.阅读原始日志数据         val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRdd: RDD[String] = rdd.filter(_.split("_")(6) != "-1")
        val clickCountRdd: RDD[(String, Int)] = clickActionRdd.map((action: String) => (action.split("_")(6), 1)).reduceByKey(_ + _)

        // 3、统计品类的下单数量:(品类ID,下单数量)
        val orderActionRdd: RDD[String] = rdd.filter(_.split("_")(8) != "null")
        val orderCountRdd: RDD[(String, Int)] = orderActionRdd.flatMap(action => { 
        
            action.split("_")(8).split(",").map((_, 1))
        }).reduceByKey(_ + _)

        // 4、统计品类的支付数量:(品类ID,支付数量)
        val payActionRdd: RDD[String] = rdd.filter(_.split("_")(10) != "null")
        val payCountRdd: RDD[(String, Int)] = payActionRdd.flatMap(action => { 
        
            action.split("_")(10).split(",").map((_, 1))
        }).reduceByKey(_ + _)

        // 5、讲品类进行排序,并且取前十名
        // 点击数量排序,下单数量排序,支付数量排序
        // 元祖排序:先比较第一个,再比较第二个,再比较第三个,一次类推
        // (品类ID, (点击数量, 下单数量, 支付数量))
        // 连接数据 cogroup = connect + group
        val cogrouprdd: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
            clickCountRdd.cogroup(orderCountRdd, payCountRdd)
        val analysisRDD: RDD[(String, (Int, Int, Int))] = cogrouprdd.mapValues { 
        
            case (clickIter, orderIter, payIter) => { 
        
                var clickCount = 0;
                if (clickIter.iterator.hasNext) { 
        
                    clickCount = clickIter.iterator.next()
                }

                var orderCount = 0;
                if (orderIter.iterator.hasNext) { 
        
                    orderCount = orderIter.iterator.next()
                }

                var payCount = 0;
                if (payIter.iterator.hasNext) { 
        
                    payCount = payIter.iterator.next()
                }

                (clickCount, orderCount, payCount)
            }
        }

        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10)

        // 6、采集
        resultRDD.foreach(println)
// (15,(6120,1672,1259))
// (2,(6119,1767,1196))
// (20,(6098,1776,1244))
// (12,(6095,1740,1218))
// (11,(6093,1781,1202))
// (17,(6079,1752,1231))
// (7,(6074,1796,1252))
// (9,(6045,1736,1230))
// (19,(6044,1722,1158))
// (13,(6036,1781,1161))

        sc.stop()
    }
}

第二种方法:

object TestHostCategoryTop10T2 { 
        

    def main(args: Array[String]): Unit = { 
        
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")
        rdd.cache()

        // Q1:数据源被重复使用的次数过多
        // Q2:cogroup有可能存在Shuffle,性能较低
        // (品类ID,点击数量) => (品类ID,(点击数量, 0, 0))
        // (品类ID,下单数量) => (品类ID,(0, 下单数量, 0))
        // (品类ID,支付数量) => (品类ID,(0, 0, 支付数量))
        // 然后两两聚合最终形成:(品类ID, (点击数量, 下单数量, 支付数量))

        // 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRdd: RDD[String] = rdd.filter(_.split("_")(6) != "-1")
        val clickCountRdd: RDD[(String, (Int, Int, Int))] = clickActionRdd.map((action: String) => (action.split("_")(6), 1))
            .reduceByKey(_ + _)
            .mapValues((_, 0, 0))

        // 3、统计品类的下单数量:(品类ID,下单数量)
        val orderActionRdd: RDD[String] = rdd.filter(_.split("_")(8) != "null")
        val orderCountRdd: RDD[(String, (Int, Int, Int))] = orderActionRdd.flatMap(action => { 
        
            action.split("_")(8).split(",").map((_, 1))
        }).reduceByKey(_ + _).mapValues((0, _, 0))

        // 4、统计品类的支付数量:(品类ID,支付数量)
        val payActionRdd: RDD[String] = rdd.filter(_.split("_")(10) != "null")
        val payCountRdd: RDD[(String, (Int, Int, Int))] = payActionRdd.flatMap(action => { 
        
            action.split("_")(10).split(",").map((_, 1))
        }).reduceByKey(_ + _).mapValues((0, 0, _))

        // 5、讲品类进行排序,并且取前十名
        // 点击数量排序,下单数量排序,支付数量排序
        // 元祖排序:先比较第一个,再比较第二个,再比较第三个,一次类推
        // 将三个数据源合并在一起,统一进行聚合计算
        val analysisRdd: RDD[(String, (Int, Int, Int))] = clickCountRdd.union(orderCountRdd).union(payCountRdd).reduceByKey(
            (t1, t2) => { 
        
                (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
            }
        )
        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRdd.sortBy(_._2, false).take(10)

        // 6、采集
        resultRDD.foreach(println)

        sc.stop()
    }
}

第三种方法实现:

object TestHostCategoryTop10T3 { 
        

    def main(args: Array[String]): Unit = { 
        
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // Q1:存在大量的Shuffle操作(reduceByKey)
        // reduceByKey,聚合算子,spark会提供优化,缓存

        // 2、将数据转换结构
        // 点击场合:(品类, (1, 0, 0))
        // 下单场合:(品类, (0, 1, 0))
        // 支付场合:(品类, (0, 0, 1))
        val flatRDD: RDD[(String, (Int, Int, Int))] = rdd.flatMap(
            action => { 
        
                val datas = action.split("_")
                if (datas(6) != "-1") { 
        
                    // 点击场合
                    List((datas(6), (1, 0, 0)))
                } else if (datas(8) != "null") { 
        
                    // 下单场合
                    datas(8).split(",").map((_, (0, 1, 0)))
                } else if (datas(10) != "null") { 
        
                    // 支付场合
                    datas(10).split(",").map((_, (0, 0, 1)))
                } else { 
        
                    Nil
                }
            }
        )


        // 3、将相同的品类ID的数据进行分区聚合
        // (品类ID, (点击数量, 下单数量, 支付数量))
        val analysisRdd: RDD[(String, (Int, Int, Int))] =flatRDD.reduceByKey(
            (t1, t2) => { 
        
                (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
            }
        )

        // 4、降品类进行排序,并且取前十名
        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRdd.sortBy(_._2, false).take(10)

        // 5、采集
        resultRDD.foreach(println)

        sc.stop()
    }
}

第四种实现方法

object TestHostCategoryTop10T4 { 
        

    def main(args: Array[String]): Unit = { 
        
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // Q1:存在Shuffle操作(reduceByKey)

        // 声明累加器
        val acc = new HotCategoryAccumulator;
        sc.register(acc, "hotCategory")


        // 2、将数据转换结构
        rdd.foreach(
            action => { 
        
                val datas = action.split("_")
                if (datas(6) != "-1") { 
        
                    // 点击场合
                    acc.add(datas(6), "click")
                } else if (datas(8) != "null") { 
        
                    // 下单场合
                    datas(8).split(",").foreach(acc.add(_, "order"))
                } else if (datas(10) != "null") { 
        
                    // 支付场合
                    datas(10).split(",").foreach(acc.add(_, "pay"))
                }
            }
        )

        val categories: mutable.Iterable[HotCategory] = acc.value.map(_._2)
        val sortList: List[HotCategory] = categories.toList.sortWith(
            (left, right) => { 
        
                if (left.clickCnt > right.clickCnt) { 
        
                    true
                } else if (left.clickCnt == right.clickCnt) { 
        
                    if (left.orderCnt > right.orderCnt) { 
        
                        true
                    } else if (left.orderCnt == right.orderCnt) { 
        
                        left.payCnt > right.payCnt
                    } else { 
        
                        false
                    }
                } else { 
        
                    false
                }
            }
        )

        // 4、降品类进行排序,并且取前十名
        val result = sortList.take(10)

        // 5、采集
        result.foreach(println)

        sc.stop()
    }

    /** * 自定义累加器 * 1、继承AccumulatorV2,定义泛型 * IN: (品类ID, 行为类型) * OUT: mutable.Map[String, HotCategory] * 2、重写方法(6) */
    class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] { 
        

        private val hcMap = mutable.Map[String, HotCategory]()

        override def isZero: Boolean = hcMap.isEmpty

        override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] =
            new HotCategoryAccumulator

        override def reset(): Unit = hcMap.clear()

        override def add(v: (String, String)): Unit = { 
        
            val cid: String = v._1
            val actionType: String = v._2
            val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
            if (actionType == "click") { 
        
                category.clickCnt += 1
            } else if (actionType == "order") 
        标签: 6074连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台