日志收集分析平台
文章目录
- 日志收集分析平台
-
- 一、环境准备
- 二、nginx搭建
- 三、项目结构及kafka的介绍
-
- 负载均衡:nginx反向代理
- 高可用:keepalived
- 3.1 kafka项目优势
- 3.2 kafka使用场景
-
- 3.2.1 日志收集
- 3.2.2 业务解耦
- 3.2.3 流量削峰
- 3.3 kafka通信方式:发布订阅
- 3.4 kafka术语解释
- 3.5 问题总结
-
- 3.5.1 kafka如何保证高可用性?
- 3.5.2 一个partition有n个副本,数据写哪个副本?
- 3.5.3 数据一致性问题
-
- 3.5.3.1 生产者数据一致性问题
- 3.5.3.2 消费者数据的一致性
- 3.5.4 为什么生产者可以随意连接哪台机器?leader处理吗?(消费者消费leader同样的数据)
- 3.5.5 消费者如何知道自己消费了哪些数据?下次继续消费?
- 四、kafka(2.12)和zookeeper(3.6.3)的搭建
- 五、filebeat部署
- 六、zookeeper在kafka集群的作用
一、环境准备
1.准备好搭建三台虚拟机nginx和kafka集群
2.修改三台机器的主机名称
# 永久修改主机名称 或者也可以选择修改/修改/或者修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/或修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改/修改etc/hostname文件 [root@wh ~]# hostnamestl set-hostname nginx-kafka01 # 重新登录,加载主机名称 [root@nginx-kafka01 ~]# su [root@nginx-kafka02 ~]# hostnamestl set-hostname nginx-kafka02 [root@nginx-kafka03 ~]# hostnamestl set-hostname nginx-kafka03
3.配置ip地址和dns(步骤可参考手动配置ip地址https://blog.csdn.net/weixin_50426379/article/details/125790311?spm=1001.2014.3001.5501)
dhcp动态分配ip地址,重启之后ip地址可能会发生变化,导致以下配置文件中的地址发生变化ip也要修改,会比较麻烦
[root@nginx-kafka01 ~]# cat /etc/sysconfig/network-scripts/ifcfg-ens33 BOOTPROTO="none" NAME="ens33" DEVICE="ens33" ONBOOT="yes" IPADDR=192.168.72.130 PREFIX=24 GATEWAY=192.168.72.2 DNS1=114.114.114.114 [root@nginx-kafka02 ~]# cat /etc/sysconfig/network-scripts/ifcfg-ens33 BOOTPROTO="none" NAME="ens33" DEVICE="ens33" ONBOOT="yes" IPADDR=192.168.72129
PREFIX=24
GATEWAY=192.168.72.2
DNS1=114.114.114.114
[root@nginx-kafka03 ~]# cat /etc/sysconfig/network-scripts/ifcfg-ens33
BOOTPROTO="none"
NAME="ens33"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.72.140
PREFIX=24
GATEWAY=192.168.72.2
DNS1=114.114.114.114
[root@nginx-kafka01 ~]# cat /etc/resolv.conf
; generated by /usr/sbin/dhclient-script
search localdomain 168.72.137
nameserver 114.114.114.114
# /etc/resolv.conf 指定本地域名服务器114.114.114.114
4.每台机器上都写好域名解析
[root@nginx-kafka01 ~]# cat /etc/hosts #本地ip和域名的映射关系
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.72.130 nginx-kafka01 # ip 主机名 的格式
192.168.72.129 nginx-kafka02
192.168.72.140 nginx-kafka03
# DNS解析过程:
1. 浏览器的缓存
2. 本地的hosts文件 --linux /etc/hosts
3. 请求本地域名服务器 --linux /etc/resolv.conf
5.每台机器上都安装基本软件
[root@nginx-kafka01 ~]# yum install wget lsof vim -y
6.每台机器上都安装时间同步服务
yum -y install chrony # 安装chrony
systemctl enable chronyd # 设置chrony为开机启动,disable关闭开机自启
systemctl start chronyd # 开启chronyd服务
# 设置时区
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
# 如果是同一文件就不用复制了
7.关闭防火墙和selinux
[root@nginx-kafka01 ~]# systemctl stop firewalld
[root@nginx-kafka01 ~]# systemctl disable firewalld
# 关闭selinux:
[root@nginx-kafka01 ~]# vim /etc/selinux/config
# 修改下面这行配置
SELINUX=disabled
# selinux关闭 需要重启机器
# selinux 是linux系统内核里一个跟安全相关的子系统
# 规则非常繁琐,一般日常工作里都是关闭的
selinux有三种模式
enforcing 强制模式,必须按照规则
permissive 宽容模式,有的允许,有的不允许
disabled 关闭模式,关闭所有selinu规则
# 查看是否生效
[root@nginx-kafka01 ~]# getenforce
Disabled
二、nginx搭建
1.安装好epel源和nginx:
[root@nginx-kafka01 ~]# yum install epel-release -y
[root@nginx-kafka01 ~]# yum install nginx -y
2.启动nginx
# 启动
[root@nginx-kafka01 ~]# systemctl start nginx
设置开机自启
[root@nginx-kafka01 ~]# systemctl enable nginx
# 查看开机自启的服务
[root@nginx-kafka01 nginx]# cd /etc/systemd/system/multi-user.target.wants/
[root@nginx-kafka01 multi-user.target.wants]# ls
nginx.service
3.编辑配置文件
[root@nginx-kafka01 ~]# cd /etc/nginx/ [root@nginx-kafka01 nginx]# ls nginx.conf [root@nginx-kafka01 nginx]# cat nginx.conf ... #全局块 events { #events块 ... } http #http块 { ... #http全局块 server #server块 { ... #server全局块 location [PATTERN] #location块 { ... } location [PATTERN] { ... } } server { ... } ... #http全局块 } 1、全局块:配置影响nginx全局的指令。一般有运行nginx服务器的用户组,nginx进程pid存放路径,日志存放路径,配置文件引入,允许生成worker process数等。 2、events块:配置影响nginx服务器或与用户的网络连接。有每个进程的最大连接数,选取哪种事件驱动模型处理连接请求,是否允许同时接受多个网路连接,开启多个网络连接序列化等。 3、http块:可以嵌套多个server,配置代理,缓存,日志定义等绝大多数功能和第三方模块的配置。如文件引入,mime-type定义,日志
自定义,是否使用sendfile传输文件,连接超时时间,单连接请求数等。 4、server块:配置虚拟主机的相关参数,一个http中可以有多个server。 5、location块:配置请求的路由,以及各种页面的处理情况 [root@nginx-kafka01 conf.d]# cat /etc/nginx/nginx.conf # 可以看到在http全局配置有下面这条配置 include /etc/nginx/conf.d/*.conf; # 会加载/etc/nginx/conf.d目录下以.conf结尾的文件 # 所以我们在/etc/nginx/conf.d目录下新建sc.conf配置文件 [root@nginx-kafka01 conf.d]# cat sc.conf # 一个server块对应一个虚拟主机的配置 server { # 监听80端口 # default_server 默认的虚拟主机 # nginx 的 default_server 指令可以定义默认的 server 去处理一些没有匹配到 server_name 的请求,如果没有显式定义,则会选取第一个定义的 server 作为 default_server。 listen 80 default_server; # 域名 server_name www.sc.com; # 根 --> 对应/usr/share/nginx/html这个目录 root /usr/share/nginx/html; # 访问日志 /var/log/nginx/sc/access.log # main 日志格式 access_log /var/log/nginx/sc/access.log main; # location 路由 / --> root -->/usr/share/nginx/html # 会在/usr/share/nginx/html里面寻找index.html页面 location / { } }
4.语法检测
# 语法检测
[root@nginx-kafka01 html]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: [emerg] open() "/var/log/nginx/sc/access.log" failed (2: No such file or directory)
nginx: configuration file /etc/nginx/nginx.conf test failed
# 报错,提示没有/var/log/nginx/sc这个目录
# 新建目录
[root@nginx-kafka01 html]# mkdir /var/log/nginx/sc
# 语法检测成功
[root@nginx-kafka01 html]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
5.重新加载nginx配置文件
[root@nginx-kafka01 html]# nginx -s reload
三、项目架构以及kafka的介绍
负载均衡:nginx反向代理
DNS也可以做负载均衡,一条域名可以解析成多个ip地址,一般来说会轮询解析成各个ip,但是如果其中一台服务器挂了,dns不会立马把这个ip地址去掉,可能会造成访问失败,虽然客户端有重试,但是还是会影响客户体验,所以我们在应用web前面添加反向代理,能做到负载均衡,安全性高。
正向代理:代理客户机
如果把局域网外的Internet
想象成一个巨大的资源库,则局域网中的客户端要访问Internet
,则需要通过代理服务器来访问,这种代理服务就称为正向代理
反向代理:代理服务器
其实客户端对代理是无感知的,因为客户端不需要任何配置就可以访问,我们只需要将请求发送到反向代理服务器,由反向代理服务器去选择目标服务器获取数据后,再返回给客户端,此时反向代理服务器和目标服务器对外就是一个服务器,暴露的是代理服务器地址,隐藏了真实服务器 IP
地址。
正向代理和反向代理的区别:一句话就是:如果我们客户端自己用,就是正向代理。如果是在服务器用,用户无感知,就是反向代理。
高可用:keepalived
反向代理机使用keepalived双vip互为主备做高可用,提高资源利用率
keepalived实现的原理是使用vrrp协议
vip就可以实现高可用了,双vip是为了解决backup资源闲置
3.1 kafka在项目中的优势
问题:我们的三台Nginx可以用脚本将数据一起传到mysql中,那为什么还要弄kafka-zookeeper集群来统一处理数据再传入数据库?
- 故障发生时,方便定位问题
- 日志集中管理,后续需要日志的程序直接从kafka获取日志即可,尽可能的减少日志处理对nginx的影响
3.2 kafka使用场景
消息中间件作用:日志收集,业务解耦,流量削峰
消息中间件有哪些:RabbitMQ、ActiveMQ、RocketMQ、pulsar
3.2.1 日志收集
日志集中管理,后续需要日志的程序直接从kafka获取日志即可,尽可能的减少日志处理对nginx的影响
3.2.2 业务解耦
优点:提高扩展性,因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
业务解耦
- 当一个程序含有多种业务时,业务之间会互相影响。现在让各个业务独立发展,每个地方都只做一件事情。只要一个地方负责了多项事情,就存在解耦的可能。
业务解耦实现
- 当一个程序含有多种业务,各个业务互不影响且不需向前端传递任何值。可以将各个业务独立出来,每个业务独立完成。技术方案采用:消息队列
3.2.3 流量削峰
要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。
3.3 kafka的通信方式:发布订阅
消息中间件的两种通信模式:点对点、发布订阅(kafka)
点对点:一对一模式,而且不能重复消费,数据被消费完了就会被删除
发布订阅以及它的优势:在发布-订阅消息系统中,消息被持久化到一个topic中。
与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。并发性要好,不过并发性也会取决于partition的数量,broker的数量
3.4 kafka中的术语解释
-
:kafka服务器节点
-
:主题–>消息的分类,比如nginx还是mysql日志给不同的主题就是不同的类型
-
:分区,提高吞吐量,提高并发性能,提高效率
-
一个topic可以对应几个partition,一般partition的数量对应broker的数量
-
但是多个partition会造成消息顺序混乱,如果对消息顺序有要求就只设置一个partition就可以了
-
-
:副本,kafka里的高可用–>就是完整的分区备份
-
自己本身就是一个副本,第二个副本放在其他的broker上,实现高可用
-
(producer):写入数据的–>filebeat
-
(consumer):拿取数据的
-
:(唯一)每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
-
:(n-1个)Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
-
(in-sync-replica):集合列表 需要同步的follower集合
比如5个副本,1个leader 4个follower --> 同步列表
有一条消息来了,leader怎么知道要同步到哪些副本呢?根据ISR来,如果一个follower挂了或者卡住或者同步过慢,那就从这个列表里踢出了
如果leader挂了,就会从ISR里面再选出一个leader,一般来说是ISR中的第一个
3.5 问题总结
3.5.1 kafka如何保证高可用?
多个broker+多个partition+多个replica
broker数量如果和replica数量一致,可以坏掉n-1台
如果一个follower挂了或者卡住或者同步过慢,那就从这个列表里踢出了
如果leader挂了,就会从ISR里面再选出一个leader,一般来说是ISR中的第一个
如果有一个机器宕机,后续启动之后想要重新加入ISR,必须得同步到ISR中的HW值(最高水位线)才可以加入进来
3.5.2 一个partition有n个副本,那数据写入哪个副本呢?
生产者和消费者只和leader打交道,所以数据写入leader,leader再根据ISR同步给其他follower
3.5.3 数据一致性问题
3.5.3.1 生产者数据一致性问题
如果生产者给leader发送7条数据,但是follower才同步3条,leader就挂了,导致数据的不一致性?怎么解决数据的一致性问题?
producer生产者可以通过request.required.acks
参数设置
-
acks=0:生产者不会等待任何来自服务器的响应。
如果当中出现问题,导致服务器没有收到消息,那么生产者无从得知,会造成消息丢失
由于生产者不需要等待服务器的响应所以可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量
-
acks=1(默认值):只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应
如果消息无法到达Leader节点(例如Leader节点崩溃,新的Leader节点还没有被选举出来)生产者就会收到一个错误响应,为了避免数据丢失,生产者会重发消息
如果一个没有收到消息的节点成为新Leader,消息还是会丢失
此时的吞吐量主要取决于使用的是同步发送还是异步发送,吞吐量还受到发送中消息数量的限制,例如生产者在收到服务器响应之前可以发送多少个消息
-
acks=-1:只有当所有参与复制的节点全部都收到消息时,生产者才会收到一个来自服务器的成功响应
这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行
延时比acks=1更高,因为要等待不止一个服务器节点接收消息
根据实际的应用场景,我们设置不同的 acks
,以此保证数据的可靠性。
3.5.3.2 消费者数据一致性
消费者消费数据时,引入了High Water Mark 机制,木桶效应,只能消费到ISR列表里偏移量最少的副本的消息数量
同一个消费组里面的消费者在同一时刻只能消费一个partition,且消费者1在消费这个partition时,消费者2就不能消费这个partition了,不然会导致数据混乱
但是不同的消费组是独立的,不同消费组里面的消费者是可以消费同一个partition了,不会导致自己的消费组消费的数据混乱
3.5.4 为什么生产者可以随便连接哪一台机器,不是只能和leader打交道吗?(消费者消费leader的数据同理)
生产者跟任何一台broker连接都可以,broker之间会有协商,broker会返回当前请求副本leader的信息,最后生产者再跟leader交互。
3.5.5 消费者如何知道自己已经消费到了哪些数据呢?下次继续消费?
消费者消费的时候,会记录自己的消费偏移量,消费偏移量可以自己保存在本地,也可以提交到kafka的_consumer_offsets主题里保存。会保存topic id、partition和偏移量
数据的存储目录:
[root@nginx-kafka03 data]# pwd
/data
[root@nginx-kafka03 data]# ls
cleaner-offset-checkpoint __consumer_offsets-40
__consumer_offsets-1 __consumer_offsets-43
__consumer_offsets-10 __consumer_offsets-46
__consumer_offsets-13 __consumer_offsets-49
__consumer_offsets-16 __consumer_offsets-7
__consumer_offsets-19 log-start-offset-checkpoint
__consumer_offsets-22 meta.properties
__consumer_offsets-25 nginxlog-0
__consumer_offsets-28 recovery-point-offset-checkpoint
__consumer_offsets-31 replication-offset-checkpoint
__consumer_offsets-34 sc-0
__consumer_offsets-37 sc-1
__consumer_offsets-4 sc-2
[root@nginx-kafka03 nginxlog-0]# cd nginxlog-0
[root@nginx-kafka03 nginxlog-0]# ls
00000000000000000000.index 00000000000000000017.snapshot
00000000000000000000.log leader-epoch-checkpoint
00000000000000000000.timeindex partition.metadata
文件夹 : <topic_name>-<分区号> 例如sc-0 、sc-1、sc-2
每一个partition的数据都是由很多个segment存储,每一个segment由一个index和log文件组成。
为什么partition分出多个segment?
kafka的日志可以按照两个维度来设置清除
1.按时间
2.按大小
任意一个按时间或者大小的条件满足,都可以触发日志清理
四、kafka(2.12)和zookeeper(3.6.3)的搭建
zookeeper:分布式应用协调管理服务:统一配置管理、域名管理、分布式数据存储、集群管理
三台机器上都要操作
1.安装
# 安装java:
yum install java wget -y
# 安装kafka:
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
# 解压:
tar xf kafka_2.12-2.8.1.tgz
# 安装zookeeper:
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
# 将安装的包转移到/opt目录下
[root@nginx-kafka03 bin]# ls /opt
apache-zookeeper-3.6.3-bin kafka_2.12-2.8.1
apache-zookeeper-3.6.3-bin.tar.gz kafka_2.12-2.8.1.tgz
2.配置zookeeper
# 进入zk配置目录
[root@nginx-kafka03 conf]# pwd
/opt/apache-zookeeper-3.6.3-bin/conf
# 复制配置文件到当前改名为zoo.cfg
[root@nginx-kafka03 conf]# cp zoo_sample.cfg zoo.cfg
[root@nginx-kafka03 conf]# ls
configuration.xsl log4j.properties zoo.cfg zoo_sample.cfg
# 修改配置文件zoo.cfg
[root@nginx-kafka03 conf]# vim zoo.cfg
# 在最后加上下面的配置
server.1=192.168.72.130:3888:4888 #server.1对应的是nginx-kafka01对应的ip
server.2=192.168.72.129:3888:4888 #server.1对应的是nginx-kafka02对应的ip
server.3=192.168.72.140:3888:4888 #server.1对应的是nginx-kafka03对应的ip
#3888和4888都是端口 一个用于数据传输,一个用于检验存活性和选举
# 创建/tmp/zookeeper目录,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容,标记自己这台机器是第几台,对应上面的server.1,2,3配置
在nignx-kafka01这台机器上:echo 1 > /tmp/zookeeper/myid
在nignx-kafka02这台机器上:echo 2 > /tmp/zookeeper/myid
在nignx-kafka03这台机器上:echo 3 > /tmp/zookeeper/myid
3.启动zookeeper
[root@nginx-kafka03 bin]# pwd
/opt/apache-zookeeper-3.6.3-bin/bin
[root@nginx-kafka03 bin]# ls
README.txt zkEnv.cmd zkServer.sh zkTxnLogToolkit.sh
zkCleanup.sh zkEnv.sh zkSnapShotToolkit.cmd
zkCli.cmd zkServer.cmd zkSnapShotToolkit.sh
zkCli.sh zkServer-initialize.sh zkTxnLogToolkit.cmd
[root@nginx-kafka03 bin]# ./zkServer.sh start
[root@nginx-kafka03 bin]# ./zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
4.启动kafka
[root@nginx-kafka03 kafka_2.12-2.8.1]# pwd
/opt/kafka_2.12-2.8.1
[root@nginx-kafka03 kafka_2.12-2.8.1]# ls
bin config libs LICENSE licenses logs NOTICE site-docs
[root@nginx-kafka03 kafka_2.12-2.8.1]# bin/kafka-server-start.sh -daemon config/server.properties
# 查看kafka进程有没有启动
[root@nginx-kafka03 kafka_2.12-2.8.1]# ps -ef |grep kafka
# 进入zookeeper内部查看是否连接到了kafka brokers
[root@nginx-kafka03 bin]# pwd
/opt/apache-zookeeper-3.6.3-bin/bin
[root@nginx-kafka03 bin]# ls
README.txt zkEnv.cmd zkServer.sh zkTxnLogToolkit.sh
zkCleanup.sh zkEnv.sh zkSnapShotToolkit.cmd
zkCli.cmd zkServer.cmd zkSnapShotToolkit.sh
zkCli.sh zkServer-initialize.sh zkTxnLogToolkit.cmd
[root@nginx-kafka03 bin]# ./zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, sc, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1, 2, 3]
# 显示连接到了3台,配置成功
5.测试
# 创建topic
[root@nginx-kafka03 kafka_2.12-2.8.1]# pwd
/opt/kafka_2.12-2.8.1
[root@nginx-kafka03 kafka_2.12-2.8.1]# ls
bin config libs LICENSE licenses logs NOTICE site-docs
[root@nginx-kafka03 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.72.140:2181 --replication-factor 3 --partitions 3 --topic sc
# 创建生产者和消费者
# 生产者
[root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-producer.sh --t 192.168.72.140:9092 --topic sc
# 消费者
[root@nginx-kafka02 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --server 192.168.72.140:9092 --topic sc --from-beginning
五、filebeat部署
filebeat:轻量级日志采集器
filebeat组件:inputs(输入)、harvesters(收集器)
1.安装
[root@nginx-kafka01 kafka_2.12-2.8.1]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
2.编辑/etc/yum.repos.d/fb.repo文件
[root@nginx-kafka01 kafka_2.12-2.8.1]# vim /etc/yum.repos.d/fb.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
3.yum安装
[root@nginx-kafka01 kafka_2.12-2.8.1]# yum install filebeat -y
# 查看filebeat有没有安装 rpm -qa 是查看机器上安装的所有软件包
[root@nginx-kafka01 kafka_2.12-2.8.1]# rpm -qa |grep fileberat
4.设置开机自启
[root@nginx-kafka01 kafka_2.12-2.8.1]# systemctl enable filebeat
5.修改配置文件/etc/filebeat/filebeat.yml
[root@nginx-kafka01 kafka_2.12-2.8.1]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/sc/access.log
#==========------------------------------kafka-----------------------------------
output.kafka:
hosts: ["192.168.72.130:9092","192.168.72.129:9092","192.168.72.140:9092"]
topic: nginxlog
keep_alive: 10s
# 命令行模式下使用:set paste 再进行粘贴,就不会出现格式混乱
6.创建主题
[root@nginx-kafka01 kafka_2.12-2.8.1]# cd /opt/kafka_2.12-2.8.1
[root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.72.130:2181 --replication-factor 1 --partitions 1 --topic nginxlog
7.启动服务
# 启动服务
[root@nginx-kafka01 kafka_2.12-2.8.1]# systemctl start filebeat
# 检查服务是否启动
[root@nginx-kafka01 kafka_2.12-2.8.1]# ps aux|grep filebeat
root 5676 0.1 4.5 940164 84072 ? Ssl 17:04 0:02 /usr/share/filebeat/bin/filebeat --environment systemd -c /etc/filebeat/filebeat.yml --path.home /usr/share/filebeat --path.config /etcfilebeat --path.data /var/lib/filebeat --path.logs /var/log/filebeat
root 6722 0.0 0.0 112824 980 pts/1 S+ 17:26 0:00 grep --color=auto filebeat
8.新建消费者消费数据
# 前提是/var/log/nginx/sc/access.log 里面有数据给filebeat拿到,才能吐到kafka给消费者消费
[root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.72.130:9092 --topic nginxlog --from-beginning
遇到的问题1:
filebeat的配置文件中配置了多个output(output.elasticsearch),导致filebeat服务一直启动不起来
解决办法:
通过查看filebeat日志文件less /var/log/messages,和查看filebeat官网发现了问题,配置一个output,filebeat启动成功
之前测试是通过ip地址访问的nginx的html静态界面,如果想要通过域名访问,需要修改客户端本地的hosts文件,添加nginx机器的ip和域名对应的映射关系
遇到的问题2:
如果访问不过去 1.检查防火墙是否关闭 2.检查nginx服务是否启动–>进程(ps aux/ps -ef)和端口(lsof -i) *:http 代表的是可以访问本机的任意ip:80 如果绑定的ip是127.0.0.1,那么客户端访问这个ip会访问到自己,因为每台机器都有这个回环地址,代表的是自己
3.如果在windows机器上ping域名解析出来的ip是在hosts文件里面配置的ip,但是通过浏览器却访问不了,可以通过清除浏览器缓存之后再次访问
# 查看端口的占用情况
[root@nginx-kafka01 kafka_2.12-2.8.1]# lsof -i:80
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
nginx 1054 root 7u IPv4 19955 0t0 TCP *:http (LISTEN)
nginx 1054 root 9u IPv6 19957 0t0 TCP *:http (LISTEN)
nginx 1055 nginx 7u IPv4 19955 0t0 TCP *:http (LISTEN)
nginx 1055 nginx 9u IPv6 19957 0t0 TCP *:http (LISTEN)
nginx 1056 nginx 7u IPv4 19955 0t0 TCP *:http (LISTEN)
nginx 1056 nginx 9u IPv6 19957 0t0 TCP *:http (LISTEN)
nginx 1057 nginx 7u IPv4 19955 0t0 TCP *:http (LISTEN)
nginx 1057 nginx 9u IPv6 19957 0t0 TCP *:http (LISTEN)
nginx 1058 nginx 7u IPv4 19955 0t0 TCP *:http (LISTEN)
nginx 1058 nginx 9u IPv6 19957 0t0 TCP *:http (LISTEN)
六、zookeeper在kafka集群中的作用
-
zookeeper中leader的选举:一致性算法 zab
少数服从多数原则,票数过半的当选为leader 票数>= n//2 + 1
-
leader和follower的作用
客户端连接任意一台zk都可以操作,但是数据新增修改等事务操作必须在leader上运行,客户端如果连接到follower上进行事务操作,follower会返回leader的ip,最终客户端还是会在leader上操作。
如果进行查询操作,可以直接连接follower进行查询操作
follower的作用:查询,选举
-
follower同步leader数据的方式
leader和follower数据的同步,只要过半节点同步完成,就表示数据已经commit
zookeeper不是强一致性,它属于最终一致性
查询体验不会很差,因为每个节点数据量默认不超过1M,同步很快
-
zookeeper集群节点存活数必须过半,集群才能正常使用,所以一般zk集群的节点数都设置为奇数,方便选举
-
zookeeper在kafka中的作用:
1.保存kafka的元数据,topic,partition,副本信息
2.选举controller,通过抢占的方式选举,先到先得controller,选举出的kafka controller管理kafka副本的leader和follower同步、选举
kafka3.0版本已经脱离zookeeper,kafka自己实现zookeeper功能
七、数据入库 1、需求分析 需要nginx日志的ip,时间,带宽字段 将ip字段解析成相应的省份、运营商 存入数据库的字段: id, 时间, 省份, 运营商, 带宽
步骤 1、创建数据表
MariaDB [nginx]> desc nginxlog;
+-------+------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| dt | datetime | NO | | NULL | |
| prov | varchar(5) | YES | | NULL | |
| isp | varchar(5) | YES | | NULL | |
| bd | float | YES | | NULL | |
+-------+------------+------+-----+---------+----------------+
5 rows in set (0.04 sec)
2、编写python脚本, 从kafka获取nginx日志
3、获取好的nginx日志,提取出ip,时间,带宽字段 4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商 url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114” 5、格式化时间字段 “2021-10-12 12:00:00” 6、存入数据库
[root@nginx-kafka03 lianxi]# cat python_consumer.py import json import requests import time import pymysql # 打开数据库连接 db = pymysql.connect( host = "192.168.72.130", #mysql主机ip user = "wh", #用户名 passwd = "123456", #密码 database = "nginx" #数据库 ) # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=" #查询ip地址的信息(省份和运营商isp),通过taobao网的接口 def resolv_ip(ip): response = requests.get(taobao_url+ip) if response.status_code == 200: tmp_dict = json.loads(response.text) prov = tmp_dict["data"]["region"] isp = tmp_dict["data"]["isp"] return prov,isp return None,None #将日志里读取的格式转换为我们指定的格式 def trans_time(dt): #把字符串转成时间格式 timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S") #timeStamp = int(time.mktime(timeArray)) #把时间格式转成字符串 new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) 标签:
sc连接器挂掉的原因