一、编程实现:redis下沉器:MySinkToRedis
### --- 编程实现:redis下沉器:MySinkToRedis package ads import java.util import modes.CityOrder import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import redis.clients.jedis.Jedis /** * 自定义下沉器,下沉结果数据到Redis中 */ class MySinkToRedis extends RichSinkFunction[Tuple2[CityOrder, Long]] { var jedis: Jedis = null override def open(parameters: Configuration): Unit = { jedis = new Jedis("hadoop02", 6379, 6000) jedis.auth("123456") jedis.select(0) } //(CityOrder(青岛市,山东省,8178.2,1),1608282220000) override def invoke(value: (CityOrder, Long), context: SinkFunction.Context[_]): Unit = { // println("value:" value) if (!jedis.isConnected) { jedis.connect() } /* val str: String = "totalmoney:" value._1.totalMoney.toString "totalCount:" value._1.totalCount.toString "time:" value._2.toString jedis.set(value._1.province value._1.city.toString, str)*/ val map = new util.HashMap[String, String]() map.put("totalMoney", value._1.totalMoney.toString) map.put("totalCount", value._1.totalCount.toString) map.put("time", value._2.toString) // jedis.set("a","aa") // print(value._1.totalMoney.toString "totalCount",value._1.totalCount.toString value._2.toString) if (!map.isEmpty) { println(value._1.province value._1.city.toString map.size() map.get("totalMoney") map.get("totalCount") map.get("time")) try { jedis.hset(value._1.province value._1.city.toString, map) map.clear() } catch { case e: Exception => print(e) } } // jedis.hse7 } override def close(): Unit = { jedis.close() } }
二、编程实现:增量统计:man方法
### --- 编程实现:增量统计:man方法 package dw.dws import java.util import ads.MySinkToRedis import com.alibaba.fastjson.{JSON, JSONObject} import modes.{CityOrder, TableObject} import myutils.{ConnHBase, SourceKafka} import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.hadoop.hbase.client.{Connection, HTable, Result, Scan} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{Cell, TableName} /** * 省市:订单总数 、 订单总金额 、 时间 */ object OrderStatistics { def main(args: Array[String]): Unit = { val envSet: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val envStream: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ///获得两部分数据源 // dim_yanqi_area val hbaseData: DataSet[tuple.Tuple2[String, String]] = envSet.createInput(new TableInputFormat[tuple.Tuple2[String, String]] { override def configure(parameters: Configuration): Unit = { val conn: Connection = new ConnHBase().connToHbase table = classOf[HTable].cast(conn.getTable(TableName.valueOf("dim_yanqi_area"))) scan = new Scan() scan.addFamily(Bytes.toBytes("f1")) } override def getScanner: Scan = { scan } override def getTableName: String = { "dim_yanqi_area" } override def mapResultToTuple(r: Result): tuple.Tuple2[String, String] = { val rowKey: String = Bytes.toString(r.getRow) val sb = new StringBuffer() for (cell: Cell <- r.rawCells()) { val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append(",") } val valueString: String = sb.replace(sb.length() - 1, sb.length(), "").toString val tuple2 = new tuple.Tuple2[String, String]() tuple2.setField(rowKey, 0) tuple2.setField(valueString, 1) tuple2 } }) // hbaseData.print() val areaList: List[tuple.Tuple2[String, String]] = hbaseData.collect().toList // yanqi_trade_order //从kafka获取增量数据 val kafkaConsumer: FlinkKafkaConsumer[String] = new SourceKaka().getKafkaSource("")
val dataSourceStream: DataStream[String] = envStream.addSource(kafkaConsumer)
val dataSourceMapped: DataStream[TableObject] = dataSourceStream.map(x => {
val jsonObject: JSONObject = JSON.parseObject(x)
val database: AnyRef = jsonObject.get("database")
val table: AnyRef = jsonObject.get("table")
val typeInfo: AnyRef = jsonObject.get("type")
val objects = new util.ArrayList[TableObject]()
jsonObject.getJSONArray("data").forEach(x => {
objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
})
objects.get(0)
})
val filteredData: DataStream[TableObject] = dataSourceMapped.filter(x => {
x.tableName.toString.equals("yanqi_trade_orders")
})
val orderInfo: DataStream[TradeOrder] = filteredData.map(x => {
val order: TradeOrder = JSON.parseObject(x.dataInfo, classOf[TradeOrder])
order
})
val keyed: KeyedStream[TradeOrder, Int] = orderInfo.keyBy(_.areaId)
val value: DataStream[(String, (Double, Int))] = keyed.map(x => {
var str: String = null
var money: Double = 0L
var count: Int = 0
//地域 areaList
for (area <- areaList) {
if (area.f0.equals(x.areaId.toString)) {
money += x.totalMoney
count += 1
str = area.f1
}
}
val strs: Array[String] = str.split(",") //市南区,370200,青岛市,370000,山东省
//((城市-省份),(money,count))
(strs(2).toString + "-" + strs(4).toString, (money, count))
})
val result: DataStream[(CityOrder, Long)] = value.keyBy(_._1)
.timeWindow(Time.seconds(60 * 10), Time.seconds(5))
.aggregate(new MyAggFunc(), new MyWindowFunc())
result.print()
result.addSink(new MySinkToRedis())
envStream.execute()
}
class MyAggFunc extends AggregateFunction[(String, (Double, Int)),(Double,Long),(Double,Long)]{
override def createAccumulator(): (Double, Long) = {
(0,0L)
}
override def add(value: (String, (Double, Int)), accumulator: (Double, Long)): (Double, Long) = {
(accumulator._1 + value._2._1,accumulator._2+ value._2._2)
}
override def getResult(accumulator: (Double, Long)): (Double, Long) = {
accumulator
}
override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
(a._1 + b._1,a._2 + b._2)
}
}
class MyWindowFunc extends WindowFunction[(Double,Long),(CityOrder,Long),String,TimeWindow] {
override def apply(
key: String,
window: TimeWindow,
input: Iterable[(Double, Long)],
out: Collector[(CityOrder, Long)]): Unit = {
val info: (Double, Long) = input.iterator.next()
val totalMoney: Double = info._1
val totalCount: Long = info._2
val strs: Array[String] = key.split("-")
val city: String = strs(0)
val province: String = strs(1)
out.collect(CityOrder(city,province,totalMoney,totalCount.toInt),window.getEnd)
}
}
}