资讯详情

Flink_uv统计——使用布隆过滤器

一:UV在一定时间内(如1小时)网站的独立访客数量(Unique Visitor.) 一天内,同一访客的多次访问仅记录为一名访客。 一般通过用户IP和cookie判断UV值的两种方式。埋点日志一般包含USERID package networkflow

import networkflow.PageView.UserBehavior import networkflow.UniqueVistorWithBoolean.UvCount import org.apache.flink.streaming.api._ import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessWindowFunction} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis

object UniqueVistorWithBoolean { //case class UvCount(windowEnd:Long,count:Long) case class UvCount(windowEnd: Long, count: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)

// 从文件中读取数据 val resource = getClass.getResource("/UserBehavior.csv") val inputStream: DataStream[String] = env.readTextFile(resource.getPath)  // 转换成样例类型,提取时间戳和watermark val dataStream: DataStream[UserBehavior] = inputStream   .map(data => {     val arr = data.split(",")     UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)   })   .assignAscendingTimestamps(_.timestamp * 1000L)  val uvStream = dataStream   .filter(_.behavior == "pv")   .map( data => ("uv", data.userId) )   .keyBy(_._1)   .timeWindow(Time.hours(1))   .trigger(new MyTrigger())    // 定制触发器 每次数据 直接清空window   .process( new UvCountWithBloom() )  uvStream.print()  env.execute("uv with bloom job") 

} } // 定制触发器 每次数据 直接清空window (输入类型),window类型】 class MyTrigger() extends Trigger[(String, Long), TimeWindow]{ override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}

override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = ///每次数据 TriggerResult.FIRE_AND_PURGE }

// 布隆过滤器的自定义主要是位图和hash函数 class Bloom(size: Long) extends Serializable{ private val cap = size // 默认cap应该是2的整次幂

// hash函数 def hash(value: String, seed: Int): Long = { var result = 0 for( i <- 0 until value.length ){ result = result * seed value.charAt(i) } // 返回hash值,映射cap范围内 截取result (cap - 1) & result } }

// 实现自定义窗口处理函数 class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{ // 定义redis连接和布隆过滤器 lazy val jedis = new Jedis(“hadoop203”, 6379) //64m的空间 64M=2 6*1000k=2610241024=26*220 =2^29bit 用5亿个位置保存1亿个id lazy val bloomFilter = new Bloom(1<<29) // 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB

// 只有在收集所有数据和窗口触发计算时才会调用;现在每个数据都被调用一次 因为用了triger override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { // 先定义redis存储位图key 这是关窗时的时间值 val windowEnd = context.window.getEnd.toString

// 此外,当前窗口uv count值,作为状态保存redis里面,用一个叫uvcount的hash表来保存(windowEnd,count) val currentKey = context.window.getEnd.toString //当前窗口uvcount值保存到状态redis里 用一个uvcount的hash表来保存(windowEnd,count) //定义表名称 uvcount val uvCountMap = "uvcount" var count=0L if(jedis.hget(uvCountMap, currentKey) != null) {   // 从redis取出当前窗口uv count值   count=jedis.hget(uvCountMap,currentKey).toLong } // 去重:判断当前userId的hash值对应的位图位置,是否为0 val userId = elements.last._2.toString // 计算hash值对应着位图中的偏移量 61为可调参数 防止哈希碰撞 val offset = bloomFilter.hash(userId, 61) // 用redis位操作命令,取bitmap中对应位值 val isExist = jedis.getbit(windowEnd, offset) if(!isExist){   // 如果不存在,则位图对应位置1,并将count值加1   jedis.setbit(windowEnd, offset, true)   //redis存放数据("uvcount",context.window.getEnd.toString,count 1)   jedis.hset(uvCountMap, currentKey, (count   1).toString) } 

} }

标签: uv连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台