资讯详情

CC00084.kafka——|Hadoop&kafka.V69|——|kafka.v69|稳定性|重试队列.v01|

一、重试队列
### --- 重试队列  ~~~     kafka没有重试机制不支持新闻重试,也没有死信队列,所以使用kafka做消息队列时, ~~~     需要实现新闻重试的功能。
### --- 实现:创新kafka主题作为重试队列:  ~~~     创建一个topic作为重试topic,接收等待重试的消息。 ~~~     普通topic消费者设置下一次重试新闻topic。 ~~~     从重试topic获取待重试消息存储redis的zset以下消费时间排序 ~~~     定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic ~~~     同一消息重试次数过多,不再重试
二、创造一个springboot项目
### --- 创建springboot项目:  ~~~     ——>New Module——>Spring Initializr:配置信息如下——>NExt——> ~~~     ——>Spring Boot:2.2.8——> ~~~     ——>添加依赖:web、kafka、redis——>Next——>END
### --- 更给springboot版本2.5.5为2.2.8      <modelVersion>4.0.0</modelVersion>     <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>2.2.8.RELEASE</version>         <relativePath/> <!-- lookup parent from repository -->     </parent>
### --- 标准pom.xml依赖文件  <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>     <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>2.2.8.RELEASE</version>         <relativePath/> <!-- lookup parent from repository -->     </parent>     <groupId>com.yanqi.kafka.demo</groupId>     <artifactId>demo-retryqueue</artifactId>     <version>0.0.1-SNAPSHOT</version>     <name>demo-retryqueue</name>     <description>Demo project for Spring Boot</description>     <properties>         <java.version>1.8</java.version>     </properties>     <dependencies>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-web</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.kafka</groupId>             <artifactId>spring-kafka</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-test</artifactId>             <scope>test</scope>             <exclusions>                 <exclusion>                     <groupId>org.junit.vintage</groupId>                     <artifactId>junit-vintage-engine</artifactId>                 </exclusion>             </excusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
### --- 添加application.properties:src——>main——>resources——>application.properties

# bootstrap.servers
spring.kafka.bootstrap-servers=hadoop01:9092
# key序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=192.168.1.111
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000

# Kafka主题名称,业务主题
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列,重试主题
spring.kafka.topics.retry=tp_demo_retry_02
### --- RetryqueueApplication.java

package com.yanqi.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RetryqueueApplication {

    public static void main(String[] args) {
        SpringApplication.run(RetryqueueApplication.class, args);
    }

}

标签: 1v69连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台