资讯详情

java连接kafka实现生产者消费者功能

一、功能描述

利用Java连接Kafka,通过API对于生产者和消费者来说,对于Kafka生产或消费数据。输出日志信息。

二、依赖导入

首先,创建一个简单的maven并将依赖导入

<dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>${kafka.version}</version> </dependency> <dependency>  <groupId>log4j</groupId>  <artifactId>log4j</artifactId>  <version>1.2.17</version> </dependency> <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.33</version> </dependency> 

三、日志配置

#指定log4j的输出信息 log4j.rootLogger=INFO, stdout, logfile #指定log4j的标准输出 log4j.appender.stdout=org.apache.log4j.ConsoleAppender #指定log4j标准输出样式 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout #转换格式的指定标准输出 log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n #输出指定日志文件 log4j.appender.logfile=org.apache.log4j.FileAppender #指定log4j输出路径文件名 log4j.appender.logfile.File=log/hd.log #指定日志日志输出样式 log4j.appender.logfile.layout=org.apache.log4j.PatternLayout #定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

四、基于Zookeeper的消费者

//进行导包
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

public class ZkConsumer { 
        
    public static void main(String[] args) { 
        
        //初始化配置信息
        Properties config = new Properties();
        //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
        //定义分组信息,相当于kafka脚本命令的-group
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"kb16");
        //定义数据偏移量配置,配置信息有:earliest、latest、none和anything else四种配置
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //定义自动提交时间,时间单位为ms
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,500);
        //定义是否开启自动提交
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //定义消费者的键的反序列化的配置
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        //定义消费者的值的反序列化配置
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        //初始化存放消费者的队列
        KafkaConsumer<Integer,String> consumer=new KafkaConsumer<>(config);
        //订阅主题
        consumer.subscribe(Arrays.asList("kb16-test02"));
        //循环遍历进行数据获取
        while(true){ 
        
            //迭代器遍历消费者数据
            Iterator<ConsumerRecord<Integer, String>> it = consumer.poll(Duration.ofMillis(500)).iterator();
            //如果还有数据
            if(it.hasNext()) { 
        
                //遍历消费者数据,并数据拼接起来
                do { 
        
                    ConsumerRecord<Integer, String> record = it.next();
                    StringBuilder builder = new StringBuilder();
                    builder.append(record.topic());
                    builder.append("\t");
                    builder.append(record.partition());
                    builder.append("\t");
                    builder.append(record.offset());
                    builder.append("\t");
                    builder.append(record.timestamp());
                    builder.append("\t");
                    builder.append(record.key());
                    builder.append("\t");
                    builder.append(record.value());
                    builder.append("\t");
                    System.out.println(builder.toString());
                } while (it.hasNext());
            }
        }
        //consumer.close();
    }
}

五、基于Zookeeper的生产者

//导包
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
public class ZkProducer { 
        
    public static void main(String[] args) { 
        
        //初始化配置
        Properties config = new Properties();
        //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
        //定义批次大小信息
        config.put(ProducerConfig.BATCH_SIZE_CONFIG,5);
        //生产者将在请求传输之间到达的任何记录组合成一个批处理请求。
        config.put(ProducerConfig.LINGER_MS_CONFIG,1000);
        //定义确认策略,配置信息有:0、1和all,默认一般为all
        config.put(ProducerConfig.ACKS_CONFIG,"all");
        //定义失败重试的次数
        config.put(ProducerConfig.RETRIES_CONFIG,3);
        //producer -Event Stream->kafka server(java object)
        //定义生产者键的serialization序列化
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerSerializer");
        //定义生产者的值的序列化
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
		//初始化生产者队列
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(config);
		//定义主题
        final String TOPIC="kb16-test02";
        //定义偏移量
        final int PART=0;
        for (int i = 0; i < 100; i++) { 
        
            //传入数据进行封装
            ProducerRecord<Integer,String> record =
                    new ProducerRecord<>(TOPIC,PART,System.currentTimeMillis(),i,"happy new year"+i);
            //向kafka发送数据
            producer.send(record);
        }
        //关闭生产者
        producer.close();
    }

}

标签: j95连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台