package com.idea.fault.code import java.text.SimpleDateFormat import java.time.Duration import java.{lang, util} import com.alibaba.fastjson.{JSON, JSONObject} import com.idea.fault.entity.{Fault, SignalInputBean} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction} import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.scala._ import scala.collection.JavaConversions._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector object BatteryTempOver04 { def main(args: Array[String]): Unit = { // TODO 1.创造执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // TODO 2.读取数据构建数据源:订阅两个topic: // 故障topic val faultSourceDs: DataStream[String] = env.socketTextStream("hadoop102", 9999) // 信号数据 val signalSourceDs: DataStream[String] = env.socketTextStream("hadoop102", 8888) // kafka生产环境:故障Topic /* val properties = new Properties() properties.setProperty("bootstrap.servers", "bigdata-kafka-01.chj.cloud:6667,bigdata-kafka-02.chj.cloud:6667,bigdata-kafka-03.chj.cloud:6667,bigdata-kafka-04.chj.cloud:6667,bigdata-kafka-05.chj.cloud:6667,bigdata-kafka-06.chj.cloud:6667") properties.setProperty("group.id", "fault") val topic:String = "SSP-VA-NOTICE-BATT_AUTO-KA" val consumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties) consumer.setStartFromEarliest() val inputDs: DataStream[String] = env.addSource(consumer)*/ /*val signalProp = new Properties() signalProp.setProperty("bootstrap.servers", "bigdata-kafka-vehicle-01.chj.cloud:6667,bigdata-kafka-vehicle-02.chj.cloud:6667,bigdata-kafka-vehicle-03.chj.cloud:6667,bigdata-kafka-vehicle-04.chj.cloud:6667") signalProp.setProperty("group.id", "flink3") val signalTopic = "iot-signal-engine" val consumer = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties) val consumer1 = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties1) val consumer2 = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties2)*/ // val format2: TextInputFormat = new TextInputFormat(new Path("E:\\fault")) // val value2: DataStream[String] = env.readFile(format2, "E:\\fault", FileProcessingMode.PROCESS_CONTINUOUSLY, 10).setParallelism(1) // consumer.setStartFromTimestamp(1652427000000L) // consumer1.setStartFromTimestamp(1652427000000L) // consumer2.setStartFromTimestamp(1652427000000L) /*consumer.setStartFromLatest() consumer1.setStartFromLatest() consumer2.setStartFromLatest() val value2 = env.addSource(consumer)*/ // TODO 3.数据转换:将 JSONString 转换为 样例类 // 3.1 添加字段id,因为数据库有一个自增字段id,默认为0 // 3.2 添加字段actionQuality,默认为空字符串"",写表操作方便 // 3.3 数据转换:将JSONString数据转换为样例类 // 3.4 时间转换,将alarmTime转换为 日期 // 3.5打印测试 // 3.将故障数据转换为例子 val faultEntityDs: DataStream[Fault] = faultSourceDs.map(line => { val mapObj: JSONObject = JSON.parseObject(line) val id: Long = 0 val vin: String = mapObj.getString("vin") val time_sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val alarmTime: Long = mapObj.getLong("alarmTime") val strategyKey: String = mapObj.getString("strategyKey") val strategyName: String = mapObj.getString("alarmStrategyName") val strategyDesc: String = mapObj.getString("alarmStrategyDesc") val strategyLevel: String = mapObj.getString("alarmLevel") val actionQuality: String = " " val remarks: String = " " // 时间格式化为日期 val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val dt: String = sdf.format(alarmTime) Fault(id, vin, alarmTime, strategyKey, strategyName, strategyDesc, strategyLevel, actionQuality, remarks, dt) }) // 打印测试 // faultEntityDs.print("mapDs:") val faultWm: WatermarkStrategy[Fault] = WatermarkStrategy.forBoundedOutOfOrderness[Fault](Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner[Fault] { override def extractTimestamp(t: Fault, l: Long): Long = t.alarmTime }) val faultWmDs: DataStream[Fault] = faultEntityDs.assignTimestampsAndWatermarks(faultWm) // TODO 过滤:根据 故障码 过滤出温差过大的数据 val batteryFilterDs: DataStream[Fault] = faultWmDs.filter(new FilterFunction[Fault] { override def filter(value: Fault): Boolean = value.strtegyDesc.equals("电池温差过大")
})
// TODO 分组:按照vin分组
val batteryKeybyDs: KeyedStream[Fault, String] = batteryFilterDs.keyBy(_.vin)
//batteryKeybyDs.print("batteryKeybyDs:")
// 3.1将 信号数据 转换为 样例类
val signalInputDs: DataStream[SignalInputBean] = signalSourceDs.flatMap(new FlatMapFunction[String, SignalInputBean] {
override def flatMap(value: String, out: Collector[SignalInputBean]): Unit = {
val valueObj: JSONObject = JSON.parseObject(value)
val vin: String = valueObj.getString("vin")
var sigName: String = ""
var sigVal: Double = 0D
var collectTime: Long = 0L
var validStatus: String = ""
val signalsArr = valueObj.getJSONArray("signals")
if (signalsArr != null && signalsArr.size() > 0) {
for (i <- 0 until signalsArr.size()) {
val signalObj: JSONObject = signalsArr.getJSONObject(i)
sigName = signalObj.getString("sigName")
sigVal = signalObj.getDoubleValue("sigVal")
collectTime = signalObj.getLong("collectTime")
validStatus = signalObj.getString("validStatus")
out.collect(SignalInputBean(vin, sigName, sigVal, collectTime, validStatus))
}
}
}
})
// signalInputDs.print()
// TODO 提取事件时间生成watermark
val signalWm: WatermarkStrategy[SignalInputBean] = WatermarkStrategy.forBoundedOutOfOrderness[SignalInputBean](Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[SignalInputBean] {
override def extractTimestamp(element: SignalInputBean, recordTimestamp: Long): Long = {
element.collectTime
}
})
val signalWmDs: DataStream[SignalInputBean] = signalInputDs.assignTimestampsAndWatermarks(signalWm)
// TODO 过滤:过滤出 电池温差过大数据:根据名称过滤
val signalFilterDs: DataStream[SignalInputBean] = signalWmDs.filter(new FilterFunction[SignalInputBean] {
override def filter(value: SignalInputBean): Boolean = {
value.sigName.equals("BMS_SensorTemp1") || value.sigName.equals("BMS_SensorTemp2") || value.sigName.equals("BMS_SensorTemp3") || value.sigName.equals("BMS_SensorTemp4") || value.sigName.equals("BMS_SensorTemp5") || value.sigName.equals("BMS_SensorTemp6") || value.sigName.equals("BMS_SensorTemp7") || value.sigName.equals("BMS_SensorTemp8") || value.sigName.equals("BMS_SensorTemp9") || value.sigName.equals("BMS_SensorTemp10") || value.sigName.equals("BMS_SensorTemp11") || value.sigName.equals("BMS_SensorTemp12") || value.sigName.equals("BMS_SensorTemp13") || value.sigName.equals("BMS_SensorTemp14") || value.sigName.equals("BMS_SensorTemp15") || value.sigName.equals("BMS_SensorTemp16") || value.sigName.equals("BMS_SensorTemp17") || value.sigName.equals("BMS_SensorTemp18") || value.sigName.equals("BMS_SensorTemp19") || value.sigName.equals("BMS_SensorTemp20") || value.sigName.equals("BMS_SensorTemp21") || value.sigName.equals("BMS_SensorTemp22") || value.sigName.equals("BMS_SensorTemp23") || value.sigName.equals("BMS_SensorTemp24") || value.sigName.equals("BMS_PosRESSMaxTemp") || value.sigName.equals("BMS_PosRESSMinTemp") || value.sigName.equals("BMS_RESSMaxTemp") || value.sigName.equals("BMS_RESSMinTemp")
}
})
// TODO 分组:按照vin分组
val signalKeybyDs: KeyedStream[SignalInputBean, String] = signalFilterDs.keyBy(_.vin)
// signalKeybyDs.print("signalKeybyDs:")
// TODO 双流合并:状态变成核心逻辑
/** *
* 思路一: 将单个值置为状态存储
* 思路二:将每条数据作为一条状态保存起来
* 思路三:列表状态
*/
signalKeybyDs.intervalJoin(batteryKeybyDs)
.between(Time.milliseconds(-300000), Time.milliseconds(0))
.process(new ProcessJoinFunction[SignalInputBean, Fault, Fault] {
// 定义状态:用于保存温值
var signalListState: ListState[Double] = _
// 定义状态:用于保存 温度传感器编号
var sensorNumState: ListState[Int] = _
// 定义MapState:用于存放 sigName sigVal
var mapState: MapState[String, Double] = _
// 状态初始化
override def open(parameters: Configuration): Unit = {
// 状态初始化:初始化温度值
val signalDescriptor: ListStateDescriptor[Double] = new ListStateDescriptor[Double]("signalListState", classOf[Double])
signalListState = getRuntimeContext.getListState(signalDescriptor)
// 初始化传感器编号状态
val sensorNumStateDescriptor: ListStateDescriptor[Int] = new ListStateDescriptor[Int]("sensorNumState", createTypeInformation[Int])
sensorNumState = getRuntimeContext.getListState(sensorNumStateDescriptor)
// 初始化 MapState 状态值
val mapStateDescriptor: MapStateDescriptor[String, Double] = new MapStateDescriptor[String, Double]("map-state", classOf[String], classOf[Double])
mapState = getRuntimeContext.getMapState(mapStateDescriptor)
}
// 状态编程
override def processElement(left: SignalInputBean, right: Fault, ctx: ProcessJoinFunction[SignalInputBean, Fault, Fault]#Context, out: Collector[Fault]): Unit = {
// 声明电池最高温度和电池最低温度
var tempMax: Double = 0.0D
var tempMin: Double = 0.0D
val sigNameKey: String = left.sigName
val sigValValue: Double = left.sigVal
// 获取状态值
signalListState.add(left.sigVal)
// 将 sigNameKey sigValValue 存入 MapState
mapState.put(sigNameKey, sigValValue)
// 遍历状态,如果有就对比
val sinnalIterator: util.Iterator[Double] = signalListState.get().iterator()
// 定义一个数组:用于存放状态历史值
// val signalArr: util.ArrayList[Double] = new util.ArrayList[Double]()
while (sinnalIterator.hasNext) {
println(1)
// println("sinnalIterator:"+sinnalIterator.next())
val signalValue: Double = sinnalIterator.next()
if (signalValue > tempMax) {
tempMax = signalValue
println("tempMax:"+tempMax)
} else {
tempMin = signalValue
//println("BMS_RESSMinTemp:"+BMS_RESSMinTemp)
}
}
val diff: Double = tempMax - tempMin
// 传感器名称
val sigName: String = left.sigName
// 定义模组编号
var moduleNum: Double = 0.0D
// 创建一个Map集合,用于存放
// 2.1FPC故障算法逻辑
/*if (tempMax > 200) {
if (sigName.contains("BMS_SensorTemp")) {
val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
// 获取 BMS_SensorTemp 值:模组编号
sigNum match {
case 1 => println("M1")
case 2 => println("M1")
case 3 => println("M1")
case 4 => println("M2")
case 5 => println("M2")
case 6 => println("M2")
case 7 => println("M3")
case 8 => println("M3")
case 9 => println("M3")
case 10 => println("M4")
case 11 => println("M4")
case 12 => println("M4")
case 13 => println("M5")
case 14 => println("M5")
case 15 => println("M5")
case 16 => println("M6")
case 17 => println("M6")
case 18 => println("M6")
case 19 => println("M7")
case 20 => println("M7")
case 21 => println("M7")
case 22 => println("M8")
case 23 => println("M8")
case 24 => println("M8")
}
// 遍历MapState并取出Value
} else {
if (mapState.contains("BMS_PosRESSMaxTemp")) {
moduleNum = mapState.get("BMS_PosRESSMaxTemp")
} else if (mapState.contains("BMS_PosRESSMinTemp")) {
moduleNum = mapState.get("BMS_PosRESSMinTemp")
} else if (mapState.contains("BMS_RESSMaxTemp")) {
moduleNum = mapState.get("BMS_RESSMaxTemp")
} else {
moduleNum = mapState.get("BMS_RESSMinTemp")
}
}
}
*/
// 2.2PACK保温性能较差算法逻辑
/* if (diff >= 20 && tempMin != null) {
// 包含 BMS_SensorTemp的24个传感器
if (sigName.contains("BMS_SensorTemp")) {
val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
if (sigNum == 1 || sigNum == 2 || sigNum == 3 || sigNum == 4 || sigNum == 22 || sigNum == 24) {
println("PACK保温性能较差,无需维修")
}
// 不包含 BMS_SensorTemp的4个传感器即:BMS_PosRESSMaxTemp、BMS_PosRESSMinTemp、BMS_RESSMaxTemp、BMS_RESSMinTemp
} else {
println("PACK保温性能较差,无需维修")
}
}*/
// 2.3NTC阻值异常算法逻辑
/*if (diff >= 20 && tempMin != null) {
// 包含 BMS_SensorTemp的24个传感器
if (sigName.contains("BMS_SensorTemp")) {
val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
if (sigNum != 1 || sigNum != 2 || sigNum != 3 || sigNum != 4 || sigNum != 22 || sigNum != 24) {
println("更换第N个模组")
}
// 不包含 BMS_SensorTemp的4个传感器即:BMS_PosRESSMaxTemp、BMS_PosRESSMinTemp、BMS_RESSMaxTemp、BMS_RESSMinTemp
} else {
println("更换第N个模组")
}
}
// 2.4 误报
if (tempMax < 200 && diff < 20) {
println("误报")
}
*/
// 清空状态
signalListState.clear()
mapState.clear()
}
})
// TODO 启动任务
env.execute("BatteryTempOver02")
}
}