下列代码涉及数据Kafka接入,数据Spark算子数据处理,Kafka偏移记录、数据反压、数据批插入MySql等所有操作步骤。
package com.e_data; import com.util.DateUtil; import com.util.JDBCUtils3; import com.util.RedisUtil; import org.apache.commons.collections.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import scala.Tuple2; import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.*; /** * 最稳定版本 * 作者 liuwunan 数据不会丢失。 */ public class HuaLong_RealTime_ETC implements Serializable { 设置异常标记,如有异常,记录起始偏移,如果正常 记录解释偏移量 public static Boolean ExceptionFlag = true; //配置kafka参数(节点,消费者组,topic) private static String topics = "topics"; //指定topic private static String groupId = "groupId";//指定消费者组id private static String offset = "offset"; private static String brokers = "IP:9092,IP:9092,IP:9092";//指定kafka地 public static void main(String[] args) { //设置hadoop 文件备份为1,Hadoop 系统默认3份 减少数据同步延迟 //Configuration hdfs = new Configuration(); //hdfs.set("dfs.replication","1"); SparkConf conf = new SparkConf().setAppName("RealTimeE") .set("spark.dynamicAllocation.enabled", "false");////动态添加关闭资源 conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); conf.setMaster("local[5]"); //设置线程数 禁止使用1 可以使用 * conf.set("spark.streaming.backpressure.enabled", "true");//启用反压 conf.set("spark.streaming.backpressure.pid.minRate","1");///最小条数 conf.set("spark.streaming.kafka.maxRatePerPartition","2000");///最大条数 conf.set("spark.speculation", "true");////开启资源动态调用 JavaSparkContext sc = new JavaSparkContext(conf);//初始化sparkStreaming对象 JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60)); ///每分钟调用一批数据 //配置kafka参数(节点,消费者组,topic) Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ////每个程序启动以获得最新的消费者偏移 earliest kafkaParams.put("auto.offset.reset", "latest"); ///自动提交消费偏移量 kafkaParams.put("enable.auto.commit", "false"); HashMap<TopicPartition, Long> mapTopic = new HashMap<>(); JavaInputDStream<ConsumerRecord<String, String>> messages =null; Boolean flag = RedisUtil.FlagExits(etc_data_offset, 1); if(flag){ Map<String, String> offsets = RedisUtil.getAll(etc_data_offset, 1); for (Map.Entry<String, String> entry : offsets.entrySet()) { String partition = entry.getKey(); String offset = entry.getValue(); 截取去掉时间 只输入偏移量 避免程序错误 String[] s = offset.split("_", -1); String offset_last = s[0]; TopicPartition topicPartition = new TopicPartition(topics, Integer.valueOf(partition)); mapTopic.put(topicPartition, Long.valueOf(offset_last)); } messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams, mapTopic)); }else{ System.out.println("重头消费 最新消费"); messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)); } JavaDStream<ConsumerRecord<String, String>> v1 = messages.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
@Override
public Boolean call(ConsumerRecord<String, String> v1) throws Exception {
//判断数据
if((v1.key().startsWith("E_data")) && (10 == v1.value().split(",",-1).length)) {
return true;
}
return false;
}
});
//数据规范处理,抽取各数据源需要的字段
JavaPairDStream<String, ETC_Data> v2 = v1.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, ETC_Data>() {
@Override
public Tuple2<String, ETC_Data> call(ConsumerRecord<String, String> v) throws Exception {
E_Data e_data = new E_Data();
//iarea,device_id,x,y,car_color,serial_num,idcard_num,car_num,times
String[] split = v.value().split(",", -1);
etc_data.setArea(split[0]);
etc_data.setDevice_id(split[1]);
etc_data.setX(split[2]);
etc_data.setY(split[3]);
etc_data.setCar_color(split[4]);
etc_data.setSerial_num(split[5]);
etc_data.setIdcard_num(split[6]);
etc_data.setCar_num(split[7]);
etc_data.setUser_name(split[8]);
etc_data.setTimes(split[9]);
return new Tuple2("E_DATA",etc_data);
}
});
JavaPairDStream<String, Iterable<ETC_Data>> V3 = v2.groupByKey();
V3.foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<ETC_Data>>>() {
@Override
public void call(JavaPairRDD<String, Iterable<ETC_Data>> v4) throws Exception {
v4.repartition(1).foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<ETC_Data>>>>() {
@Override
public void call(Iterator<Tuple2<String, Iterable<ETC_Data>>> v5) throws Exception {
Connection conn = JDBCUtils3.getConnection();
PreparedStatement pstm = null;
while (v5.hasNext()) {
Tuple2<String, Iterable<ETC_Data>> next = v5.next();
List<ETC_Data> list = IteratorUtils.toList(next._2.iterator());
String sql = "INSERT INTO punlic_xxxxxxxx.xxxxxxxx(xxxxxxxx,xxxxxxxx,xxxxxxxx,x,y,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx) values(?,?,?,?,?,?,?,?,?,?,?)";
ETC_Data ed = null;
try {
pstm = conn.prepareStatement(sql);
// 设置事务为手动
conn.setAutoCommit(false);
// 批量添加
for (int i = 0; i < list.size(); i++) {
ed = list.get(i);
pstm.setString(1, UUID.randomUUID().toString());
pstm.setString(2, ed.getArea());
pstm.setString(3, ed.getDevice_id());
pstm.setString(4, ed.getX());
pstm.setString(5, ed.getY());
pstm.setString(6, ed.getCar_color());
pstm.setString(7, ed.getSerial_num());
pstm.setString(8, ed.getIdcard_num());
pstm.setString(9, ed.getUser_name());
pstm.setString(10, ed.getCar_num());
pstm.setString(11, ed.getTimes());
pstm.addBatch();
}
pstm.executeBatch();
conn.commit();
} catch (Exception e) {
//如果发生异常将 标记切换 解决 数据重复问题
ExceptionFlag = false;
e.printStackTrace();
throw new RuntimeException(e);
} finally {
JDBCUtils3.close(pstm, conn);
}
}
}
});
}
});
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> v3) throws Exception {
v3.repartition(1).foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> st) throws Exception {
String time = com.util.DateUtil.formatDateString(new Date(), DateUtil.DATE_FORMAT_12W);
HashMap<String, String> redisMapOk = new HashMap<>();
HashMap<String, String> redisMapErro = new HashMap<>();
OffsetRange[] offsetRanges = ((HasOffsetRanges) v3.rdd()).offsetRanges();
for (OffsetRange offsetRange : offsetRanges) {
//将时间拼接 添加到redis 数据库可以准备记录当前数据 偏移量消费的时间
//记录正确的偏移量 如果没有发生错误,则记录当前偏移量的结束位置,因为起始位置已经数据入库, 下次从上次的结束开始
redisMapOk.put(String.valueOf(offsetRange.partition()), offsetRange.untilOffset()+"_"+time+"_OK");
//记录错误的的偏移量 因为异常插入, 所以记录当前偏移量的起始位置。
redisMapErro.put(String.valueOf(offsetRange.partition()), offsetRange.fromOffset()+"_"+time+"_ERROR");
}
//当数据为空 不对数据添加到redis 然后减少redis 的压力
if(st.hasNext()){
if (ExceptionFlag) {
// System.out.println("正确偏移量~~~~~~~~~~~ 提交" + redisMapOk);
RedisUtil.PutAll(offset, redisMapOk, 1);
} else {
// System.out.println("错误偏移量~~~~~~~~~~~" + redisMapErro);
RedisUtil.PutAll(offset, redisMapErro, 1);
}
}
}
});
}
});
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}