资讯详情

Hadoop&实时数仓.V25|——|项目.v25|需求二:数据处理&增量统计.V3|——|编程实现/redis下沉器|

一、编程实现:redis下沉器:MySinkToRedis
### --- 编程实现:redis下沉器:MySinkToRedis  package ads  import java.util  import modes.CityOrder import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import redis.clients.jedis.Jedis  /**  * 自定义下沉器,下沉结果数据到Redis中  */ class MySinkToRedis extends RichSinkFunction[Tuple2[CityOrder, Long]] {    var jedis: Jedis = null    override def open(parameters: Configuration): Unit = {     jedis = new Jedis("hadoop02", 6379, 6000)     jedis.auth("123456")     jedis.select(0)   }    //(CityOrder(青岛市,山东省,8178.2,1),1608282220000)   override def invoke(value: (CityOrder, Long), context: SinkFunction.Context[_]): Unit = { //    println("value:"   value)     if (!jedis.isConnected) {       jedis.connect()     }    /* val str: String = "totalmoney:"   value._1.totalMoney.toString   "totalCount:"   value._1.totalCount.toString   "time:"   value._2.toString     jedis.set(value._1.province   value._1.city.toString, str)*/      val map = new util.HashMap[String, String]()     map.put("totalMoney", value._1.totalMoney.toString)     map.put("totalCount", value._1.totalCount.toString)     map.put("time", value._2.toString)      //    jedis.set("a","aa")     //    print(value._1.totalMoney.toString   "totalCount",value._1.totalCount.toString   value._2.toString)     if (!map.isEmpty) {       println(value._1.province   value._1.city.toString   map.size()   map.get("totalMoney")   map.get("totalCount")   map.get("time"))       try {          jedis.hset(value._1.province   value._1.city.toString, map)         map.clear()       } catch {         case e: Exception => print(e)       }     }     //    jedis.hse7   }    override def close(): Unit = {     jedis.close()   } }
二、编程实现:增量统计:man方法
### --- 编程实现:增量统计:man方法  package dw.dws  import java.util  import ads.MySinkToRedis import com.alibaba.fastjson.{JSON, JSONObject} import modes.{CityOrder, TableObject} import myutils.{ConnHBase, SourceKafka} import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.hadoop.hbase.client.{Connection, HTable, Result, Scan} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{Cell, TableName}   /**  * 省市:订单总数  、  订单总金额    、   时间  */ object OrderStatistics {   def main(args: Array[String]): Unit = {     val envSet: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment     val envStream: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     ///获得两部分数据源     //    dim_yanqi_area     val hbaseData: DataSet[tuple.Tuple2[String, String]] = envSet.createInput(new TableInputFormat[tuple.Tuple2[String, String]] {       override def configure(parameters: Configuration): Unit = {           val conn: Connection = new ConnHBase().connToHbase         table = classOf[HTable].cast(conn.getTable(TableName.valueOf("dim_yanqi_area")))         scan = new Scan()         scan.addFamily(Bytes.toBytes("f1"))       }        override def getScanner: Scan = {         scan       }        override def getTableName: String = {         "dim_yanqi_area"       }        override def mapResultToTuple(r: Result): tuple.Tuple2[String, String] = {         val rowKey: String = Bytes.toString(r.getRow)         val sb = new StringBuffer()         for (cell: Cell <- r.rawCells()) {           val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)           sb.append(value).append(",")         }         val valueString: String = sb.replace(sb.length() - 1, sb.length(), "").toString         val tuple2 = new tuple.Tuple2[String, String]()         tuple2.setField(rowKey, 0)         tuple2.setField(valueString, 1)         tuple2       }     })  //    hbaseData.print()     val areaList: List[tuple.Tuple2[String, String]] = hbaseData.collect().toList      //    yanqi_trade_order     //从kafka获取增量数据     val kafkaConsumer: FlinkKafkaConsumer[String] = new SourceKaka().getKafkaSource("")
    val dataSourceStream: DataStream[String] = envStream.addSource(kafkaConsumer)
    val dataSourceMapped: DataStream[TableObject] = dataSourceStream.map(x => {
      val jsonObject: JSONObject = JSON.parseObject(x)
      val database: AnyRef = jsonObject.get("database")
      val table: AnyRef = jsonObject.get("table")
      val typeInfo: AnyRef = jsonObject.get("type")
      val objects = new util.ArrayList[TableObject]()
      jsonObject.getJSONArray("data").forEach(x => {
        objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
      })
      objects.get(0)
    })

    val filteredData: DataStream[TableObject] = dataSourceMapped.filter(x => {
      x.tableName.toString.equals("yanqi_trade_orders")
    })

    val orderInfo: DataStream[TradeOrder] = filteredData.map(x => {
      val order: TradeOrder = JSON.parseObject(x.dataInfo, classOf[TradeOrder])
      order
    })

    val keyed: KeyedStream[TradeOrder, Int] = orderInfo.keyBy(_.areaId)

    val value: DataStream[(String, (Double, Int))] = keyed.map(x => {
      var str: String = null
      var money: Double = 0L
      var count: Int = 0

      //地域 areaList
      for (area <- areaList) {
        if (area.f0.equals(x.areaId.toString)) {
          money += x.totalMoney
          count += 1
          str = area.f1
        }
      }
      val strs: Array[String] = str.split(",") //市南区,370200,青岛市,370000,山东省
      //((城市-省份),(money,count))
      (strs(2).toString + "-" + strs(4).toString, (money, count))
    })

    val result: DataStream[(CityOrder, Long)] = value.keyBy(_._1)
      .timeWindow(Time.seconds(60 * 10), Time.seconds(5))
      .aggregate(new MyAggFunc(), new MyWindowFunc())

    result.print()

    result.addSink(new MySinkToRedis())


    envStream.execute()

  }

  class MyAggFunc extends AggregateFunction[(String, (Double, Int)),(Double,Long),(Double,Long)]{
    override def createAccumulator(): (Double, Long) = {
      (0,0L)
    }

    override def add(value: (String, (Double, Int)), accumulator: (Double, Long)): (Double, Long) = {
      (accumulator._1 + value._2._1,accumulator._2+ value._2._2)
    }

    override def getResult(accumulator: (Double, Long)): (Double, Long) = {
      accumulator
    }

    override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
      (a._1 + b._1,a._2 + b._2)
    }
  }

  class MyWindowFunc extends WindowFunction[(Double,Long),(CityOrder,Long),String,TimeWindow] {
    override def apply(
                        key: String,
                        window: TimeWindow,
                        input: Iterable[(Double, Long)],
                        out: Collector[(CityOrder, Long)]): Unit = {
      val info: (Double, Long) = input.iterator.next()
      val totalMoney: Double = info._1
      val totalCount: Long = info._2
      val strs: Array[String] = key.split("-")
      val city: String = strs(0)
      val province: String = strs(1)

      out.collect(CityOrder(city,province,totalMoney,totalCount.toInt),window.getEnd)
    }
  }

}

标签: 400v25kvar电容电抗器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台