资讯详情

zk+kafka集群

docker服务编排 yml文件

安装:yum install docker-compose

version: '3.1' services:   zoo1:     image: wurstmeister/zookeeper     restart: always     hostname: zoo1     container_name: zoo1     ports:       - 2184:2181      将zk1.主机映射到容器上     volumes:关联卷路径       - /data/wangzunbin/volume/zkcluster/zoo1/data:/data:Z       - /data/wangzunbin/volume/zkcluster/zoo1/datalog:/datalog:Z     environment:       ZOO_MY_ID: 1       ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888    zoo2:     image: wurstmeister/zookeeper     restart: always     hostname: zoo2     container_name: zoo2     ports:       - 2185:2181                将zk2.主机映射到容器上     volumes:       - /data/wangzunbin/volume/zkcluster/zoo2/data:/data:Z       - /data/wangzunbin/volume/zkcluster/zoo2/datalog:/datalog:Z     environment:       ZOO_MY_ID: 2       ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888    zoo3:     image: wurstmeister/zookeeper     restart: always     hostname: zoo3     container_name: zoo3     ports:       - 2186:211                将zk3,主机映射到容器上
    volumes:
      - /data/wangzunbin/volume/zkcluster/zoo3/data:/data:Z
      - /data/wangzunbin/volume/zkcluster/zoo3/datalog:/datalog:Z
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
**`zoo1,zoo2,zoo3,,,,,zk集群`**
  kafka1:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka1
    container_name: kafka1
    ports:
      - 9092:9092    开放映射端口
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1     设置主机名,跟本地host文件设置有关联
      KAFKA_ADVERTISED_PORT: 9092  
      KAFKA_BROKER_ID: 1     设置broker
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
    volumes:
      - /data/wangzunbin/volume/kfkluster/kafka1/logs:/kafka:Z
    external_links:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka2
    container_name: kafka2
    ports:
      - 9093:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_LISTENERS: PLAINTEXT://kafka2:9092
    volumes:
      - /data/wangzunbin/volume/kfkluster/kafka2/logs:/kafka:Z
    external_links:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka3
    container_name: kafka3
    ports:
      - 9094:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_LISTENERS: PLAINTEXT://kafka3:9092
    volumes:
      - /data/wangzunbin/volume/kfkluster/kafka3/logs:/kafka:Z
    external_links:
      - zoo1
      - zoo2
      - zoo3
kafka1,2,3,作为kafka集群,由port进行映射,通过9092-3,4-》9092

docker-compose up -d 进行将文件实现好

docker命令:拉取imgage (wurstmeister/kafka wurstmeister/zookeeper)

创建容器
	container_name: kafka3,kafka2,kafka1,zoo1,zoo2,zoo3
重启容器
	docker container restart  容器名或id名
进入容器
	docker exec -it 容器名 /bin/bash   进入容器终端
修改配置文件
	重启容器,修改配置失效,

创建topic 副本,分区

kafka-topics.sh --create --zookeeper xxxxxxxxxxxx:2184 –replication-factor 3 --partitions 5 --topic TestTopic

查看分区信息

kafka-topics.sh --describe --zookeeper xxxxxxxxxxxx:2184 --topic TestTopic

本地修改host===》如本地出异常hostname等问题填写

C:\Windows\System32\drivers\etc

    { 
        ip  服务器} kafka1
	{ 
        ip  服务器} kafka2
	{ 
        ip  服务器} kafka3

本地创建生产者,

public class Demo { 
        
    public static void main(String[] args) { 
        
        //创建一个Properties对象,用于存储连接kafka所需要的配置信息
        Properties kafkaProps = new Properties();
//配置kafka集群地址
        kafkaProps.put("bootstrap.servers", "xxxx:9092,xxxx:9093,xxxx:9094");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
//发送消息的key,类型为String,使用String类型的序列化器
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//发送消息的value,类型为String,使用String类型的序列化器
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建一个KafkaProducer对象,传入上面创建的Properties对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
/** * 使用ProducerRecord<String, String>(String topic, String key, String value)构造函数创建消息对象 * 构造函数接受三个参数: * topic--告诉kafkaProducer消息发送到哪个topic; * key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配 * value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配 */
        ProducerRecord<String, String> record =new ProducerRecord<>("TestTopic", "messageKey", "hello kafka");
        try { 
        
            //发送前面创建的消息对象ProducerRecord到kafka集群
            //发送消息过程中可能发送错误,如无法连接kafka集群,所以在这里使用捕获异常代码
            producer.send(record);
            //关闭kafkaProducer对象
            producer.close();
        } catch (Exception e) { 
        
            e.printStackTrace();
        }
    }
}

最终实现效果,消费者接收消息

bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9093 --topic TestTopic

> `hello kafka`

bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9092 --topic TestTopic

> `hello kafka`

bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9094 --topic TestTopic

> `hello kafka`

标签: 164zk10连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台