日志监控系统
Nginx(日志文件) -> log_process (实时读取解析写入) -> influxdb(存储) ->grafana(前端日志显示器) influxdb 属于GO语言编写的开源时序数据侧重于高性能 查询和存储时序数据,influxdb 监控数据广泛应用于存储系统,IOT实时行业数据。
- 目前市面上流行 TSDB(时序处理数据库):influxDB, TimescaleDB, QuestDB
- influxDB 类似于NOSQL数据集自动适用于标记集模型的技术;
- TimescaleDB 与 postgreSQL 兼容, 更适合物联网数据PostgreSQL更好的兼容
- QuestDB: 支持InfluxDB内联协议和PostgreSQL, 但是生态问题比较大
项目简答介绍
本日志系统 DEMO,但可直接用于生产环境,使用LOG_Process 读取Nginx ./Access.log, 使用influxDB 进行存取
log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s
常见并发模型
- 解决C10k 的问题 异步非阻塞模型(Nginx, libevent, NodeJS)-- 问题 复杂度高 大量回调函数
- 协程(Go,Erlang, lua): 编写代码与协线性函数相同;理解根加轻量级线程
3. 并行执行程序 go foo() // 执行函数 4. mgs:= <- c 多个gorountine 需要通信 5. select 从多个channel 中读取数据 ,多个 channel 随机选择消费 6. 并发: 通过调度器使任务看起来运行 属于单核CPU(逻辑操作)对IO密集型更友好 7. 并行:任务的真实运行
在go 语言中 并发执行 ,使用三种不同 gorountine, 负责装填、运输、处理 ,让程序并发运行,让任务更单一 这种思想 也可以将 日志分析读取,单独写入模块进行小模块,使用每个模块gorountine ,通过channel 数据交互,至于这么多gorountine 是在一个CPU调度执行还是分配到多个CPU上进行执行 ,取决于系统.
go 语言有自己的调度器, go fun() 属于独立工作单位,go根据每个可用的物理处理器分配一个逻辑处理器,通过这个逻辑处理器 处理独立单元, 通过设置: runtime.GOMAXPROCS(1)///给调度器分配多少个具体的逻辑处理器 服务器 物理处理器越多 ,go 得到的逻辑处理器越多,导致器允许的速度就越快。 参考:https://blog.csdn.net/ice_fire_x/article/details/105141409
系统架构
日志分析的基本流程伪函数,以下函数有两个缺陷,分析介入和分析后输出只能写死,因此需要扩展,扩展接口模式
package main import ( "fmt" "strings" "time" ) /** * 日志分析系统分为: 分析、读取、写入 */ type LogProcess struct {
path string // 读取文件路径 influxDBDsn string // influx data source rc chan string // read module to process wc chan string // process to influx }
// 返回函数使用 指针, 结构体很大 不需要进行拷贝 性能优化
func
(l
*LogProcess
)
ReadFromFile
(
)
{
// 文件读取模块 line
:=
"message" l
.rc
<- line
}
func
(l
*LogProcess
)
Process
(
)
{
// 文件解析模块 data
:=
<-l
.rc l
.wc
<- strings
.
ToUpper
(data
)
}
func
(l
*LogProcess
)
writeToInfluxDB
(
)
{
fmt
.
Println
(
<-l
.wc
)
}
func
main
(
)
{
// lp 引用类型 lp
:=
&LogProcess
{
path
:
"./tmp/access.log"
, influxDBDsn
:
"username&password..."
, rc
:
make
(
chan
string
)
, wc
:
make
(
chan
string
)
,
}
// tree goroutine run
go lp
.
ReadFromFile
(
)
go lp
.
Process
(
)
// 需要定义 chan 将 Process 数据 传递给 influxDB
go lp
.
writeToInfluxDB
(
) time
.
Sleep
(
2
* time
.Second
)
}
接口方式约束 输入和输出 进行优化
package main
import (
"fmt"
"strings"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan string // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模块
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
fmt.Println(<-wc)
}
type Read interface {
read(rc chan string)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan string) {
// 读取模块
line := "message"
rc <- line
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "./tmp/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(2 * time.Second)
}
读取模块具体实现
- 从上次读取光标后开始逐行进行读取,无需每次都全部文件读取
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan []byte // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模块
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
// wc 通道另外一种读取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 实时系统: 从文件末尾逐行进行读取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintln("open file error:%s", err.Error()))
}
// 文件末尾最开始进行读取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d读取到文件末尾, 日志还没有写入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(100 * time.Second)
}
日志解析模块
- 冲Read Chan 中读取每一行数据
- 正则方式提取所需要的监控数据
- 将数据写入到influxDB
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan []byte // read module to process
wc chan *Message // process to influx
read Read
write Writer
}
//日志写入结构体
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// 通过正则表达式进行解析数据
r := regexp.MustCompile(`(\s*)`)
loc, _ := time.LoadLocation("Asia/shanghai")
// 文件解析模块
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
log.Println("FindStringSub match fail:", string(v))
continue
}
message := &Message{
}
location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = location
// 字符串类型转换成int
atoi, err := strconv.Atoi(ret[8])
if err != nil {
log.Println("strconv.Atoi fail:", err.Error(), ret[4])
}
message.BytesSent = atoi
l.wc <- message
}
}
type Writer interface {
writer(wc chan *Message)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
// wc 通道另外一种读取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 实时系统: 从文件末尾逐行进行读取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s\n", err.Error()))
}
// 文件末尾最开始进行读取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d读取到文件末尾, 日志还没有写入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(100 * time.Second)
}