资讯详情

KeyedProcessFunction实例、TimerService 和 定时器(Timers)及监控10秒内持续上升温度应用

ProcessFunction API(底层API)

一般来说,转换算子是无法访问事件的时间戳信息和水位线信息。这在某些应用场景中非常重要。MapFunction这样的map转换算子不能访问时间戳或当前事件的事件时间。 基于此,DataStream API提供一系列Low-Level转换算子。可访问时间戳,watermark注册定时事件。您还可以输出特定的事件,如加班事件。Process Function用于构建事件驱动的应用,实现自定义的业务逻辑(使用前)window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。 Flink提供了8个Process Function:

?    ProcessFunction ?    KeyedProcessFunction ?    CoProcessFunction ?    ProcessJoinFunction ?    BroadcastProcessFunction ?    KeyedBroadcastProcessFunction ?    ProcessWindowFunction ?    ProcessAllWindowFunction

KeyedProcessFunction

KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction所以都有接口open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还提供了两种额外的方法:

processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每个元素都会调用这种方法,调用结果会放在那里Collector输出数据类型。Context可以访问元素的时间戳,元素key,以及TimerService时间服务。Context结果也可以输出到其他流量(side outputs)。 onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是回调函数。在触发之前注册的定时器时调用。参数timestamp定时器设置的触发时间戳。Collector收集输出结果。OnTimerContext和processElement的Context与参数相同,上下文提供了一些信息,如定时器触发的时间信息(事件时间或处理时间)。

TimerService 和 定时器(Timers)

Context和OnTimerContext所持有的TimerService对象有以下方法:

currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time定时器processing time到达时间时,触发timer。 registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册时间时,触发定时器执行回调函数。 deleteProcessingTimeTimer(timestamp: Long): Unit 删除前注册时间定时器。如果没有此时间戳的定时器,则不执行。 deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

当定时器timer当触发时,将执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。 插入图片描述 用processFunction写一个小需求

如果温度值在10秒内,监测温度传感器的温度值(processing time)如果连续上升,报警(未测试)

import com.atguigu.bean.SensorReading import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector  object TempIncreWarningTest {   def main(args: Array[String]): Unit = {     def main(args: Array[String]): Unit = {       val env = StreamExecutionEnvironment.getExecutionEnvironment       env.setParallelism(1)        val inputDStream: DataStream[String] = env.socketTextStream("hadoop102", 7777)        val dataDstream: DataStream[SensorReading] = inputDStream.map(         data => {           val dataArray: Array[String] = data.split(",")           SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)         })        val resultDStrem: DataStream[String] = dataDstream         .keyBy( _.id )         .process( TempIncreWarning(10000L) )        dataDstream.print("data")       resultDStrem.print("result")        env.execute("stateBackendsApp test job")     }   } }  /**  * 自定义Process Function,温度在10秒内连续上升  */ case class TempIncreWarning(interval: Long) extends KeyedProcessFunction[String, SensorReading, String]{    // 定义一个ValueState,用于保存上次的温度值   lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("last-temp", classOf[Double]) )   // 定义一个状态,用于保存设定的定时器时间戳   lazy val curTimerState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("cur-timer", classOf[Long]) )    override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {     // 取出最后一个温度值     val lastTemp: Double = lastTempState.value()     val curTimer: Long = curTimerState.value()      // 更新温度值状态     lastTempState.update(value.temperature)      // 将当前的温度值,与上次相比     if( value.temperature > lastTemp && curTimer == 0){       // 若温度升高,且未注册定时器,然后按当前时间加100s注册定时器       val ts = ctx.timerService().currentProcessingTime()   interval       // 注册当前key 指定当前时间触发定时器       ctx.timerService().registerProcessingTimeTimer(ts)       curTimerState.update(ts)      } else if( value.temperature < lastTemp ){       // 若温度下降,然后直接删除定时器,重新开始       ctx.timerService().deleteProcessingTimeTimer(curTimer)       curTimerState.clear()     }    }    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {     out.collect( "传感器 "   ctx.getCurrentKey   " 连续的温度值"   interval / 1000   "上升" )
    // 清空timer状态
    curTimerState.clear()
  }
}

标签: ts4温度传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台