资讯详情

Flink状态编程实战案例

package com.idea.fault.code  import java.text.SimpleDateFormat import java.time.Duration import java.{lang, util}  import com.alibaba.fastjson.{JSON, JSONObject} import com.idea.fault.entity.{Fault, SignalInputBean} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction} import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.scala._ import scala.collection.JavaConversions._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector  object BatteryTempOver04 {   def main(args: Array[String]): Unit = {     // TODO 1.创造执行环境     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     // TODO 2.读取数据构建数据源:订阅两个topic:     // 故障topic     val faultSourceDs: DataStream[String] = env.socketTextStream("hadoop102", 9999)     // 信号数据     val signalSourceDs: DataStream[String] = env.socketTextStream("hadoop102", 8888)      // kafka生产环境:故障Topic     /* val properties = new Properties()      properties.setProperty("bootstrap.servers", "bigdata-kafka-01.chj.cloud:6667,bigdata-kafka-02.chj.cloud:6667,bigdata-kafka-03.chj.cloud:6667,bigdata-kafka-04.chj.cloud:6667,bigdata-kafka-05.chj.cloud:6667,bigdata-kafka-06.chj.cloud:6667")      properties.setProperty("group.id", "fault")      val topic:String = "SSP-VA-NOTICE-BATT_AUTO-KA"     val consumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)     consumer.setStartFromEarliest()     val inputDs: DataStream[String] = env.addSource(consumer)*/      /*val signalProp = new Properties()     signalProp.setProperty("bootstrap.servers", "bigdata-kafka-vehicle-01.chj.cloud:6667,bigdata-kafka-vehicle-02.chj.cloud:6667,bigdata-kafka-vehicle-03.chj.cloud:6667,bigdata-kafka-vehicle-04.chj.cloud:6667")     signalProp.setProperty("group.id", "flink3")     val signalTopic = "iot-signal-engine"     val consumer = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties)     val consumer1 = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties1)     val consumer2 = new FlinkKafkaConsumer[String](signalTopic, new SimpleStringSchema(), properties2)*/      //    val format2: TextInputFormat = new TextInputFormat(new Path("E:\\fault"))     //    val value2: DataStream[String] = env.readFile(format2, "E:\\fault", FileProcessingMode.PROCESS_CONTINUOUSLY, 10).setParallelism(1)     //    consumer.setStartFromTimestamp(1652427000000L)     //    consumer1.setStartFromTimestamp(1652427000000L)     //    consumer2.setStartFromTimestamp(1652427000000L)     /*consumer.setStartFromLatest()     consumer1.setStartFromLatest()     consumer2.setStartFromLatest()     val value2 = env.addSource(consumer)*/      // TODO 3.数据转换:将 JSONString 转换为 样例类     // 3.1 添加字段id,因为数据库有一个自增字段id,默认为0     // 3.2 添加字段actionQuality,默认为空字符串"",写表操作方便     // 3.3 数据转换:将JSONString数据转换为样例类     // 3.4 时间转换,将alarmTime转换为 日期     // 3.5打印测试      // 3.将故障数据转换为例子     val faultEntityDs: DataStream[Fault] = faultSourceDs.map(line => {       val mapObj: JSONObject = JSON.parseObject(line)       val id: Long = 0       val vin: String = mapObj.getString("vin")       val time_sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")       val alarmTime: Long = mapObj.getLong("alarmTime")       val strategyKey: String = mapObj.getString("strategyKey")       val strategyName: String = mapObj.getString("alarmStrategyName")       val strategyDesc: String = mapObj.getString("alarmStrategyDesc")       val strategyLevel: String = mapObj.getString("alarmLevel")       val actionQuality: String = " "       val remarks: String = " "       // 时间格式化为日期       val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")       val dt: String = sdf.format(alarmTime)       Fault(id, vin, alarmTime, strategyKey, strategyName, strategyDesc, strategyLevel, actionQuality, remarks, dt)      })     // 打印测试     // faultEntityDs.print("mapDs:")      val faultWm: WatermarkStrategy[Fault] = WatermarkStrategy.forBoundedOutOfOrderness[Fault](Duration.ofSeconds(2))       .withTimestampAssigner(new SerializableTimestampAssigner[Fault] {         override def extractTimestamp(t: Fault, l: Long): Long = t.alarmTime       })     val faultWmDs: DataStream[Fault] = faultEntityDs.assignTimestampsAndWatermarks(faultWm)     // TODO 过滤:根据 故障码 过滤出温差过大的数据     val batteryFilterDs: DataStream[Fault] = faultWmDs.filter(new FilterFunction[Fault] {       override def filter(value: Fault): Boolean = value.strtegyDesc.equals("电池温差过大")
    })

    // TODO 分组:按照vin分组
    val batteryKeybyDs: KeyedStream[Fault, String] = batteryFilterDs.keyBy(_.vin)
    //batteryKeybyDs.print("batteryKeybyDs:")
    //  3.1将 信号数据 转换为 样例类
    val signalInputDs: DataStream[SignalInputBean] = signalSourceDs.flatMap(new FlatMapFunction[String, SignalInputBean] {
      override def flatMap(value: String, out: Collector[SignalInputBean]): Unit = {
        val valueObj: JSONObject = JSON.parseObject(value)
        val vin: String = valueObj.getString("vin")
        var sigName: String = ""
        var sigVal: Double = 0D
        var collectTime: Long = 0L
        var validStatus: String = ""
        val signalsArr = valueObj.getJSONArray("signals")
        if (signalsArr != null && signalsArr.size() > 0) {
          for (i <- 0 until signalsArr.size()) {
            val signalObj: JSONObject = signalsArr.getJSONObject(i)
            sigName = signalObj.getString("sigName")
            sigVal = signalObj.getDoubleValue("sigVal")
            collectTime = signalObj.getLong("collectTime")
            validStatus = signalObj.getString("validStatus")
            out.collect(SignalInputBean(vin, sigName, sigVal, collectTime, validStatus))
          }
        }
      }
    })

    // signalInputDs.print()
    // TODO 提取事件时间生成watermark
    val signalWm: WatermarkStrategy[SignalInputBean] = WatermarkStrategy.forBoundedOutOfOrderness[SignalInputBean](Duration.ofSeconds(2))
      .withTimestampAssigner(new SerializableTimestampAssigner[SignalInputBean] {
        override def extractTimestamp(element: SignalInputBean, recordTimestamp: Long): Long = {
          element.collectTime
        }
      })

    val signalWmDs: DataStream[SignalInputBean] = signalInputDs.assignTimestampsAndWatermarks(signalWm)
    // TODO 过滤:过滤出 电池温差过大数据:根据名称过滤
    val signalFilterDs: DataStream[SignalInputBean] = signalWmDs.filter(new FilterFunction[SignalInputBean] {
      override def filter(value: SignalInputBean): Boolean = {
        value.sigName.equals("BMS_SensorTemp1") || value.sigName.equals("BMS_SensorTemp2") || value.sigName.equals("BMS_SensorTemp3") || value.sigName.equals("BMS_SensorTemp4") || value.sigName.equals("BMS_SensorTemp5") || value.sigName.equals("BMS_SensorTemp6") || value.sigName.equals("BMS_SensorTemp7") || value.sigName.equals("BMS_SensorTemp8") || value.sigName.equals("BMS_SensorTemp9") || value.sigName.equals("BMS_SensorTemp10") || value.sigName.equals("BMS_SensorTemp11") || value.sigName.equals("BMS_SensorTemp12") || value.sigName.equals("BMS_SensorTemp13") || value.sigName.equals("BMS_SensorTemp14") || value.sigName.equals("BMS_SensorTemp15") || value.sigName.equals("BMS_SensorTemp16") || value.sigName.equals("BMS_SensorTemp17") || value.sigName.equals("BMS_SensorTemp18") || value.sigName.equals("BMS_SensorTemp19") || value.sigName.equals("BMS_SensorTemp20") || value.sigName.equals("BMS_SensorTemp21") || value.sigName.equals("BMS_SensorTemp22") || value.sigName.equals("BMS_SensorTemp23") || value.sigName.equals("BMS_SensorTemp24") || value.sigName.equals("BMS_PosRESSMaxTemp") || value.sigName.equals("BMS_PosRESSMinTemp") || value.sigName.equals("BMS_RESSMaxTemp") || value.sigName.equals("BMS_RESSMinTemp")

      }
    })

    // TODO 分组:按照vin分组
    val signalKeybyDs: KeyedStream[SignalInputBean, String] = signalFilterDs.keyBy(_.vin)
    // signalKeybyDs.print("signalKeybyDs:")

    // TODO 双流合并:状态变成核心逻辑
    /** *
     * 思路一: 将单个值置为状态存储
     * 思路二:将每条数据作为一条状态保存起来
     * 思路三:列表状态
     */

    signalKeybyDs.intervalJoin(batteryKeybyDs)
      .between(Time.milliseconds(-300000), Time.milliseconds(0))
      .process(new ProcessJoinFunction[SignalInputBean, Fault, Fault] {
        // 定义状态:用于保存温值
        var signalListState: ListState[Double] = _
        // 定义状态:用于保存 温度传感器编号
        var sensorNumState: ListState[Int] = _
        // 定义MapState:用于存放 sigName sigVal
        var mapState: MapState[String, Double] = _

        // 状态初始化
        override def open(parameters: Configuration): Unit = {
          // 状态初始化:初始化温度值
          val signalDescriptor: ListStateDescriptor[Double] = new ListStateDescriptor[Double]("signalListState", classOf[Double])
          signalListState = getRuntimeContext.getListState(signalDescriptor)

          // 初始化传感器编号状态
          val sensorNumStateDescriptor: ListStateDescriptor[Int] = new ListStateDescriptor[Int]("sensorNumState", createTypeInformation[Int])
          sensorNumState = getRuntimeContext.getListState(sensorNumStateDescriptor)

          // 初始化 MapState 状态值
          val mapStateDescriptor: MapStateDescriptor[String, Double] = new MapStateDescriptor[String, Double]("map-state", classOf[String], classOf[Double])
          mapState = getRuntimeContext.getMapState(mapStateDescriptor)

        }

        // 状态编程
        override def processElement(left: SignalInputBean, right: Fault, ctx: ProcessJoinFunction[SignalInputBean, Fault, Fault]#Context, out: Collector[Fault]): Unit = {

          // 声明电池最高温度和电池最低温度
          var tempMax: Double = 0.0D
          var tempMin: Double = 0.0D

          val sigNameKey: String = left.sigName
          val sigValValue: Double = left.sigVal

          // 获取状态值
          signalListState.add(left.sigVal)
          // 将 sigNameKey sigValValue 存入 MapState
          mapState.put(sigNameKey, sigValValue)
          // 遍历状态,如果有就对比
          val sinnalIterator: util.Iterator[Double] = signalListState.get().iterator()
          // 定义一个数组:用于存放状态历史值
          // val signalArr: util.ArrayList[Double] = new util.ArrayList[Double]()

          while (sinnalIterator.hasNext) {
            println(1)
            // println("sinnalIterator:"+sinnalIterator.next())
            val signalValue: Double = sinnalIterator.next()
            if (signalValue > tempMax) {
              tempMax = signalValue
              println("tempMax:"+tempMax)
            } else {
              tempMin = signalValue
              //println("BMS_RESSMinTemp:"+BMS_RESSMinTemp)
            }

          }
          val diff: Double = tempMax - tempMin

          // 传感器名称
          val sigName: String = left.sigName

          // 定义模组编号
          var moduleNum: Double = 0.0D
          // 创建一个Map集合,用于存放
          // 2.1FPC故障算法逻辑
          /*if (tempMax > 200) {
            if (sigName.contains("BMS_SensorTemp")) {
              val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
              // 获取 BMS_SensorTemp 值:模组编号
              sigNum match {
                case 1 => println("M1")
                case 2 => println("M1")
                case 3 => println("M1")
                case 4 => println("M2")
                case 5 => println("M2")
                case 6 => println("M2")
                case 7 => println("M3")
                case 8 => println("M3")
                case 9 => println("M3")
                case 10 => println("M4")
                case 11 => println("M4")
                case 12 => println("M4")
                case 13 => println("M5")
                case 14 => println("M5")
                case 15 => println("M5")
                case 16 => println("M6")
                case 17 => println("M6")
                case 18 => println("M6")
                case 19 => println("M7")
                case 20 => println("M7")
                case 21 => println("M7")
                case 22 => println("M8")
                case 23 => println("M8")
                case 24 => println("M8")
              }
              // 遍历MapState并取出Value
            } else {
              if (mapState.contains("BMS_PosRESSMaxTemp")) {
                moduleNum = mapState.get("BMS_PosRESSMaxTemp")

              } else if (mapState.contains("BMS_PosRESSMinTemp")) {
                moduleNum = mapState.get("BMS_PosRESSMinTemp")

              } else if (mapState.contains("BMS_RESSMaxTemp")) {
                moduleNum = mapState.get("BMS_RESSMaxTemp")

              } else {
                moduleNum = mapState.get("BMS_RESSMinTemp")

              }

            }

          }
*/
          // 2.2PACK保温性能较差算法逻辑

         /* if (diff >= 20 && tempMin != null) {
            // 包含 BMS_SensorTemp的24个传感器
            if (sigName.contains("BMS_SensorTemp")) {
              val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
              if (sigNum == 1 || sigNum == 2 || sigNum == 3 || sigNum == 4 || sigNum == 22 || sigNum == 24) {
                println("PACK保温性能较差,无需维修")

              }
              // 不包含 BMS_SensorTemp的4个传感器即:BMS_PosRESSMaxTemp、BMS_PosRESSMinTemp、BMS_RESSMaxTemp、BMS_RESSMinTemp
            } else {
              println("PACK保温性能较差,无需维修")

            }

          }*/


          // 2.3NTC阻值异常算法逻辑
          /*if (diff >= 20 && tempMin != null) {
            // 包含 BMS_SensorTemp的24个传感器
            if (sigName.contains("BMS_SensorTemp")) {
              val sigNum: Int = sigName.substring(sigName.indexOf("BMS_SensorTemp") + 14, sigName.length).toInt
              if (sigNum != 1 || sigNum != 2 || sigNum != 3 || sigNum != 4 || sigNum != 22 || sigNum != 24) {
                println("更换第N个模组")
              }
              // 不包含 BMS_SensorTemp的4个传感器即:BMS_PosRESSMaxTemp、BMS_PosRESSMinTemp、BMS_RESSMaxTemp、BMS_RESSMinTemp
            } else {
              println("更换第N个模组")
            }

          }

          // 2.4 误报
          if (tempMax < 200 && diff < 20) {
            println("误报")
          }
*/
          // 清空状态
          signalListState.clear()
          mapState.clear()

        }

      })

    // TODO 启动任务
    env.execute("BatteryTempOver02")

  }

}

标签: ssp1传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台