docker服务编排 yml文件
安装:yum install docker-compose
version: '3.1' services: zoo1: image: wurstmeister/zookeeper restart: always hostname: zoo1 container_name: zoo1 ports: - 2184:2181 将zk1.主机映射到容器上 volumes:关联卷路径 - /data/wangzunbin/volume/zkcluster/zoo1/data:/data:Z - /data/wangzunbin/volume/zkcluster/zoo1/datalog:/datalog:Z environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 zoo2: image: wurstmeister/zookeeper restart: always hostname: zoo2 container_name: zoo2 ports: - 2185:2181 将zk2.主机映射到容器上 volumes: - /data/wangzunbin/volume/zkcluster/zoo2/data:/data:Z - /data/wangzunbin/volume/zkcluster/zoo2/datalog:/datalog:Z environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 zoo3: image: wurstmeister/zookeeper restart: always hostname: zoo3 container_name: zoo3 ports: - 2186:211 将zk3,主机映射到容器上
volumes:
- /data/wangzunbin/volume/zkcluster/zoo3/data:/data:Z
- /data/wangzunbin/volume/zkcluster/zoo3/datalog:/datalog:Z
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
**`zoo1,zoo2,zoo3,,,,,zk集群`**
kafka1:
image: wurstmeister/kafka
restart: always
hostname: kafka1
container_name: kafka1
ports:
- 9092:9092 开放映射端口
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1 设置主机名,跟本地host文件设置有关联
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 1 设置broker
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
volumes:
- /data/wangzunbin/volume/kfkluster/kafka1/logs:/kafka:Z
external_links:
- zoo1
- zoo2
- zoo3
kafka2:
image: wurstmeister/kafka
restart: always
hostname: kafka2
container_name: kafka2
ports:
- 9093:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_LISTENERS: PLAINTEXT://kafka2:9092
volumes:
- /data/wangzunbin/volume/kfkluster/kafka2/logs:/kafka:Z
external_links:
- zoo1
- zoo2
- zoo3
kafka3:
image: wurstmeister/kafka
restart: always
hostname: kafka3
container_name: kafka3
ports:
- 9094:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
KAFKA_LISTENERS: PLAINTEXT://kafka3:9092
volumes:
- /data/wangzunbin/volume/kfkluster/kafka3/logs:/kafka:Z
external_links:
- zoo1
- zoo2
- zoo3
kafka1,2,3,作为kafka集群,由port进行映射,通过9092-3,4-》9092
docker-compose up -d 进行将文件实现好
docker命令:拉取imgage (wurstmeister/kafka wurstmeister/zookeeper)
创建容器
container_name: kafka3,kafka2,kafka1,zoo1,zoo2,zoo3
重启容器
docker container restart 容器名或id名
进入容器
docker exec -it 容器名 /bin/bash 进入容器终端
修改配置文件
重启容器,修改配置失效,
创建topic 副本,分区
kafka-topics.sh --create --zookeeper xxxxxxxxxxxx:2184 –replication-factor 3 --partitions 5 --topic TestTopic
查看分区信息
kafka-topics.sh --describe --zookeeper xxxxxxxxxxxx:2184 --topic TestTopic
本地修改host===》如本地出异常hostname等问题填写
C:\Windows\System32\drivers\etc
{
ip 服务器} kafka1
{
ip 服务器} kafka2
{
ip 服务器} kafka3
本地创建生产者,
public class Demo {
public static void main(String[] args) {
//创建一个Properties对象,用于存储连接kafka所需要的配置信息
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "xxxx:9092,xxxx:9093,xxxx:9094");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
//发送消息的key,类型为String,使用String类型的序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//发送消息的value,类型为String,使用String类型的序列化器
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建一个KafkaProducer对象,传入上面创建的Properties对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
/** * 使用ProducerRecord<String, String>(String topic, String key, String value)构造函数创建消息对象 * 构造函数接受三个参数: * topic--告诉kafkaProducer消息发送到哪个topic; * key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配 * value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配 */
ProducerRecord<String, String> record =new ProducerRecord<>("TestTopic", "messageKey", "hello kafka");
try {
//发送前面创建的消息对象ProducerRecord到kafka集群
//发送消息过程中可能发送错误,如无法连接kafka集群,所以在这里使用捕获异常代码
producer.send(record);
//关闭kafkaProducer对象
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
最终实现效果,消费者接收消息
bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9093 --topic TestTopic
> `hello kafka`
bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9092 --topic TestTopic
> `hello kafka`
bash-5.1# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxxx:9094 --topic TestTopic
> `hello kafka`