资讯详情

SparkStreaming实时消费Kafka数据,批量写入Mysql数据库,Java版本

下列代码涉及数据Kafka接入,数据Spark算子数据处理,Kafka偏移记录、数据反压、数据批插入MySql等所有操作步骤。

package com.e_data;  import com.util.DateUtil; import com.util.JDBCUtils3; import com.util.RedisUtil; import org.apache.commons.collections.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import scala.Tuple2; import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.*;  /**  * 最稳定版本  * 作者 liuwunan   数据不会丢失。  */  public class HuaLong_RealTime_ETC implements Serializable {      设置异常标记,如有异常,记录起始偏移,如果正常 记录解释偏移量     public static Boolean ExceptionFlag = true;     //配置kafka参数(节点,消费者组,topic)     private static String topics = "topics"; //指定topic     private static String groupId = "groupId";//指定消费者组id     private static String offset = "offset";     private static String brokers = "IP:9092,IP:9092,IP:9092";//指定kafka地      public static void main(String[] args)  {             //设置hadoop 文件备份为1,Hadoop 系统默认3份  减少数据同步延迟             //Configuration hdfs = new Configuration();             //hdfs.set("dfs.replication","1");             SparkConf conf = new SparkConf().setAppName("RealTimeE")             .set("spark.dynamicAllocation.enabled", "false");////动态添加关闭资源             conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");             conf.setMaster("local[5]");    //设置线程数 禁止使用1 可以使用 *             conf.set("spark.streaming.backpressure.enabled", "true");//启用反压             conf.set("spark.streaming.backpressure.pid.minRate","1");///最小条数             conf.set("spark.streaming.kafka.maxRatePerPartition","2000");///最大条数             conf.set("spark.speculation", "true");////开启资源动态调用             JavaSparkContext sc = new JavaSparkContext(conf);//初始化sparkStreaming对象             JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60)); ///每分钟调用一批数据             //配置kafka参数(节点,消费者组,topic)             Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));             Map<String, Object> kafkaParams = new HashMap<>();             kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);             kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);             kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);             kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);             ////每个程序启动以获得最新的消费者偏移 earliest             kafkaParams.put("auto.offset.reset", "latest");             ///自动提交消费偏移量             kafkaParams.put("enable.auto.commit", "false");             HashMap<TopicPartition, Long> mapTopic = new HashMap<>();             JavaInputDStream<ConsumerRecord<String, String>> messages =null;             Boolean flag = RedisUtil.FlagExits(etc_data_offset, 1);             if(flag){                 Map<String, String> offsets = RedisUtil.getAll(etc_data_offset, 1);                 for (Map.Entry<String, String> entry : offsets.entrySet()) {                     String partition = entry.getKey();                     String offset = entry.getValue();                     截取去掉时间 只输入偏移量 避免程序错误                     String[] s = offset.split("_", -1);                     String offset_last = s[0];                     TopicPartition topicPartition = new TopicPartition(topics, Integer.valueOf(partition));                     mapTopic.put(topicPartition, Long.valueOf(offset_last));                 }                 messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams, mapTopic));             }else{                 System.out.println("重头消费 最新消费");                 messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams));             }              JavaDStream&ltConsumerRecord<String, String>> v1 = messages.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<String, String> v1) throws Exception {
                    //判断数据
                    if((v1.key().startsWith("E_data")) &&  (10 == v1.value().split(",",-1).length)) {
                        return true;
                    }
                    return false;
                }
            });

            //数据规范处理,抽取各数据源需要的字段
            JavaPairDStream<String, ETC_Data> v2 = v1.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, ETC_Data>() {
                @Override
                public Tuple2<String, ETC_Data> call(ConsumerRecord<String, String> v) throws Exception {
                    E_Data e_data = new E_Data();
                    //iarea,device_id,x,y,car_color,serial_num,idcard_num,car_num,times
                    String[] split = v.value().split(",", -1);
                    etc_data.setArea(split[0]);
                    etc_data.setDevice_id(split[1]);
                    etc_data.setX(split[2]);
                    etc_data.setY(split[3]);
                    etc_data.setCar_color(split[4]);
                    etc_data.setSerial_num(split[5]);
                    etc_data.setIdcard_num(split[6]);
                    etc_data.setCar_num(split[7]);
                    etc_data.setUser_name(split[8]);
                    etc_data.setTimes(split[9]);
                    return new Tuple2("E_DATA",etc_data);
                }
            });

            JavaPairDStream<String, Iterable<ETC_Data>> V3 = v2.groupByKey();
            V3.foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<ETC_Data>>>() {
                @Override
                public void call(JavaPairRDD<String, Iterable<ETC_Data>> v4) throws Exception {
                    v4.repartition(1).foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<ETC_Data>>>>() {
                        @Override
                        public void call(Iterator<Tuple2<String, Iterable<ETC_Data>>> v5) throws Exception {
                            Connection conn = JDBCUtils3.getConnection();
                            PreparedStatement pstm = null;
                            while (v5.hasNext()) {
                                Tuple2<String, Iterable<ETC_Data>> next = v5.next();
                                List<ETC_Data> list = IteratorUtils.toList(next._2.iterator());
                                    String sql = "INSERT INTO punlic_xxxxxxxx.xxxxxxxx(xxxxxxxx,xxxxxxxx,xxxxxxxx,x,y,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx) values(?,?,?,?,?,?,?,?,?,?,?)";
                                    ETC_Data ed = null;
                                    try {
                                        pstm = conn.prepareStatement(sql);
                                        // 设置事务为手动
                                        conn.setAutoCommit(false);
                                        // 批量添加
                                        for (int i = 0; i < list.size(); i++) {
                                            ed = list.get(i);
                                            pstm.setString(1, UUID.randomUUID().toString());
                                            pstm.setString(2, ed.getArea());
                                            pstm.setString(3, ed.getDevice_id());
                                            pstm.setString(4, ed.getX());
                                            pstm.setString(5, ed.getY());
                                            pstm.setString(6, ed.getCar_color());
                                            pstm.setString(7, ed.getSerial_num());
                                            pstm.setString(8, ed.getIdcard_num());
                                            pstm.setString(9, ed.getUser_name());
                                            pstm.setString(10, ed.getCar_num());
                                            pstm.setString(11, ed.getTimes());
                                            pstm.addBatch();
                                        }
                                        pstm.executeBatch();
                                        conn.commit();
                                    } catch (Exception e) {
                                        //如果发生异常将 标记切换 解决 数据重复问题
                                        ExceptionFlag = false;
                                        e.printStackTrace();
                                        throw new RuntimeException(e);
                                    } finally {
                                        JDBCUtils3.close(pstm, conn);
                                    }
                                }
                        }
                    });
                }
            });

        messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> v3) throws Exception {
                v3.repartition(1).foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
                    @Override
                    public void call(Iterator<ConsumerRecord<String, String>> st) throws Exception {
                        String time = com.util.DateUtil.formatDateString(new Date(), DateUtil.DATE_FORMAT_12W);
                        HashMap<String, String> redisMapOk = new HashMap<>();
                        HashMap<String, String> redisMapErro = new HashMap<>();
                        OffsetRange[] offsetRanges = ((HasOffsetRanges) v3.rdd()).offsetRanges();
                        for (OffsetRange offsetRange : offsetRanges) {
                            //将时间拼接 添加到redis 数据库可以准备记录当前数据 偏移量消费的时间
                            //记录正确的偏移量 如果没有发生错误,则记录当前偏移量的结束位置,因为起始位置已经数据入库, 下次从上次的结束开始
                            redisMapOk.put(String.valueOf(offsetRange.partition()), offsetRange.untilOffset()+"_"+time+"_OK");
                            //记录错误的的偏移量 因为异常插入, 所以记录当前偏移量的起始位置。
                            redisMapErro.put(String.valueOf(offsetRange.partition()), offsetRange.fromOffset()+"_"+time+"_ERROR");
                        }

                        //当数据为空 不对数据添加到redis 然后减少redis 的压力
                        if(st.hasNext()){
                            if (ExceptionFlag) {
                               // System.out.println("正确偏移量~~~~~~~~~~~ 提交" + redisMapOk);
                                RedisUtil.PutAll(offset, redisMapOk, 1);
                            } else {
                               // System.out.println("错误偏移量~~~~~~~~~~~" + redisMapErro);
                                RedisUtil.PutAll(offset, redisMapErro, 1);
                            }
                        }
                    }
                });
            }
        });

            ssc.start();
        try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

标签: sc25v传感器传感器dfs

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台