一、功能描述
利用Java连接Kafka,通过API对于生产者和消费者来说,对于Kafka生产或消费数据。输出日志信息。
二、依赖导入
首先,创建一个简单的maven并将依赖导入
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.33</version> </dependency>
三、日志配置
#指定log4j的输出信息 log4j.rootLogger=INFO, stdout, logfile #指定log4j的标准输出 log4j.appender.stdout=org.apache.log4j.ConsoleAppender #指定log4j标准输出样式 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout #转换格式的指定标准输出 log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n #输出指定日志文件 log4j.appender.logfile=org.apache.log4j.FileAppender #指定log4j输出路径文件名 log4j.appender.logfile.File=log/hd.log #指定日志日志输出样式 log4j.appender.logfile.layout=org.apache.log4j.PatternLayout #定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
四、基于Zookeeper的消费者
//进行导包
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
public class ZkConsumer {
public static void main(String[] args) {
//初始化配置信息
Properties config = new Properties();
//定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
//定义分组信息,相当于kafka脚本命令的-group
config.put(ConsumerConfig.GROUP_ID_CONFIG,"kb16");
//定义数据偏移量配置,配置信息有:earliest、latest、none和anything else四种配置
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//定义自动提交时间,时间单位为ms
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,500);
//定义是否开启自动提交
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//定义消费者的键的反序列化的配置
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
//定义消费者的值的反序列化配置
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//初始化存放消费者的队列
KafkaConsumer<Integer,String> consumer=new KafkaConsumer<>(config);
//订阅主题
consumer.subscribe(Arrays.asList("kb16-test02"));
//循环遍历进行数据获取
while(true){
//迭代器遍历消费者数据
Iterator<ConsumerRecord<Integer, String>> it = consumer.poll(Duration.ofMillis(500)).iterator();
//如果还有数据
if(it.hasNext()) {
//遍历消费者数据,并数据拼接起来
do {
ConsumerRecord<Integer, String> record = it.next();
StringBuilder builder = new StringBuilder();
builder.append(record.topic());
builder.append("\t");
builder.append(record.partition());
builder.append("\t");
builder.append(record.offset());
builder.append("\t");
builder.append(record.timestamp());
builder.append("\t");
builder.append(record.key());
builder.append("\t");
builder.append(record.value());
builder.append("\t");
System.out.println(builder.toString());
} while (it.hasNext());
}
}
//consumer.close();
}
}
五、基于Zookeeper的生产者
//导包
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ZkProducer {
public static void main(String[] args) {
//初始化配置
Properties config = new Properties();
//定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
//定义批次大小信息
config.put(ProducerConfig.BATCH_SIZE_CONFIG,5);
//生产者将在请求传输之间到达的任何记录组合成一个批处理请求。
config.put(ProducerConfig.LINGER_MS_CONFIG,1000);
//定义确认策略,配置信息有:0、1和all,默认一般为all
config.put(ProducerConfig.ACKS_CONFIG,"all");
//定义失败重试的次数
config.put(ProducerConfig.RETRIES_CONFIG,3);
//producer -Event Stream->kafka server(java object)
//定义生产者键的serialization序列化
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
//定义生产者的值的序列化
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//初始化生产者队列
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(config);
//定义主题
final String TOPIC="kb16-test02";
//定义偏移量
final int PART=0;
for (int i = 0; i < 100; i++) {
//传入数据进行封装
ProducerRecord<Integer,String> record =
new ProducerRecord<>(TOPIC,PART,System.currentTimeMillis(),i,"happy new year"+i);
//向kafka发送数据
producer.send(record);
}
//关闭生产者
producer.close();
}
}