资讯详情

Flink1.12使用KeyedProcessFunction实现温度监控(案例一)

最近在哔站学习武老师Flink教程,受益匪浅。其中关于Flink有些案例,很有意思,所以记录下来。与感兴趣的朋友分享。

本案的场景如下:

实现方式:

具体代码如下:

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector;  import java.util.Properties;  public class TempConRiseWarningDemo {      public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1); //        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 9999);         Properties properties = new Properties();         properties.setProperty("bootstrap.servers", "vm01:9092");         properties.setProperty("group.id", "consumer-group");         properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         properties.setProperty("auto.offset.reset", "latest");         DataStreamSource<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));         KeyedStream<Tuple2<String, Double>, String> keyedStream = inputStream.map(new MapFunction<String, Tuple2<String, Double>>() {             @Override             public Tuple2<String, Double> map(String value) throws Exception {                 String[] fields = value.split(",");                 return new Tuple2<>(fields[0], new Double(fields[1]));             }         }).keyBy(tuple2 -> tuple2.f0);         keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));         keyedStream.process(new TempConRiseWarning(10)).print();         env.execute("Run TempConRiseWarningDemo");     }      static class TempConRiseWarning extends KeyedProcessFunction<String, Tuple2<String, Double>, String> {         private Integer interval;          public TempConRiseWarning(Integer interval) {             this.interval = interval;         }          // 定义状态,保存上一次的温度值以及定时器时间戳         private ValueState<Double> lastTempState;         private ValueState<Long> timerTsState;          @Override         public void open(Configuration parameters) throws Exception {             lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp-state", Double.class));             timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-timestamp-state", Long.class));         }          @Override         public void processElement(Tuple2<String, Double> value, Context ctx, Collector<String> out) throws Exception {             // 取出值             Double lastTemp = lastTempState.value();             Long timerTs = timerTsState.value();             // 若上次温度值为null,赋值为Double的最小值             if (lastTemp == null) {                 lastTemp = Double.MIN_VALUE;             }             // 若温度升高且无定时器,则注册定时器,开始等待             if (lastTemp < value.f1 && timerTs == null) {                 // 获取定时器时间戳                 long ts = ctx.timerService().currentProcessingTime()   interval * 1000L;                 // 注册定时器                 ctx.timerService().registerProcessingTimeTimer(ts);                 // 设置时间戳                 timerTsState.update(ts);             } else if (lastTemp > value.f1 && timerTs != null) { // 若温度下降,删除定时器                 // 删除定时器                 ctx.timerService().deleteProcessingTimeTimer(timerTs);                 // 清除时间戳                 timerTsState.clear();             }             // 更新温度值             lastTempState.update(value.f1);         }          @Override         public void onTimer(long timestamp, OnTimerContext ctx, Collector&t;String> out) throws Exception {
            // 输出警告
            out.collect("Warning! the temperature of " + ctx.getCurrentKey() + " has risen continuously in last " + interval + " seconds.");
            // 清除定时器时间戳
            timerTsState.clear();
        }

        @Override
        public void close() throws Exception {
            lastTempState.clear();
            timerTsState.clear();
        }
    }
}

标签: ts4温度传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台