资讯详情

CC00053.bdpositions——|Hadoop&实时数仓.V33|——|项目.v33|需求四:数据处理&黑名单统计.V2|——|编程...

1、编程实现:工具:显示:黑名单用户ID、广告ID、点击数
### --- 编程实现:工具类1:SourceKafka  package myutils  import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  class SourceKafka {   def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {     val props = new Properties()     props.setProperty("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");//3,4     props.setProperty("group.id","consumer-group")     props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")     props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")     props.setProperty("auto.offset.reset","la")      new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(),props);   } }
编程实现:样例类:
### --- 样例类1:BlackUser  package modes  case class BlackUser(userId: String, aid:String,count:Long)
### --- 编程实现:AdClick  package modes  case class AdClick(area: String, uid:String ,productId: String,timestamp:Long)
三、编程实现:BlackUserStatistics:显示:黑名单用户ID、广告ID、点击数
package dw.dws  import java.util.concurrent.TimeUnit  import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import modes.{AdClick, BlackUser} import myutils.SourceKafka import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer //import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector  object BlackUserStatistics {   def main(args: Array[String]): Unit = {     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     val kafkaConsumer: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("eventlog")     val data: DataStream[String] = env.addSource(kafkaConsumer)     /*     area/uid/productId/timestamp      */     val adClickStream: DataStream[AdClick] = data.map(x => {       val adJsonObject: JSONObject = JSON.parseObject(x)       val attrObject: JSONObject = adJsonObject.getJSONObject("attr")       val area: String = attrObject.get("area").toString       val uid: String = attrObject.get("uid").toString       var productId: String = null       var timestamp: Long = 0L       val array: JSONArray = adJsonObject.getJSONArray("yanqi_event")       array.forEach(x => {         val nObject: JSONObject = JSON.parseObject(x.toString)         if (nObject.get("name").equals("ad")) {           val adObject: JSONObject = nObject.getJSONObject("json")           productId = adObject.get("product_id").toString           timestamp = TimeUnit.MICROSECONDS.toSeconds(nObject.get("time").toString.toLong)         }       })       AdClick(area, uid, productId, timestamp)     })      val value: DataStream[BlackUser] = adClickStream.keyBy(x => (x.uid, x.productId))       .timeWindow(Time.seconds(10))       .aggregate(new BlackAggFunc, new BlackWindowFunc)      val result: DataStream[BlackUser] = value.filter(_.count > 10)     result.print()      env.execute()   }    class BlackAggFunc extends AggregateFunction[AdClick,Long,Long] {     override def createAccumulator(): Long = 0L      override def add(value: AdClick, accumulator: Long): Long = accumulator   1      override def getResult(accumulator: Long): Long = accumulator      override def merge(a: Long, b: Long): Long = a   b   }    class BlackWindowFunc extends WindowFunction[Long,BlackUser,(String,String),TimeWindow] {     override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[BlackUser]): Unit = {       out.collect(BlackUser(key._1,key._2,input.iterator.next()))     }   }  }

标签: v33计数输出数字光纤传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台