资讯详情

GoLang日志编程系统

日志监控系统

Nginx(日志文件) -> log_process (实时读取解析写入) -> influxdb(存储) ->grafana(前端日志显示器) influxdb 属于GO语言编写的开源时序数据侧重于高性能 查询和存储时序数据,influxdb 监控数据广泛应用于存储系统,IOT实时行业数据。

  • 目前市面上流行 TSDB(时序处理数据库):influxDB, TimescaleDB, QuestDB
  • influxDB 类似于NOSQL数据集自动适用于标记集模型的技术;
  • TimescaleDB 与 postgreSQL 兼容, 更适合物联网数据PostgreSQL更好的兼容
  • QuestDB: 支持InfluxDB内联协议和PostgreSQL, 但是生态问题比较大

项目简答介绍

本日志系统 DEMO,但可直接用于生产环境,使用LOG_Process 读取Nginx ./Access.log, 使用influxDB 进行存取

log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s 
常见并发模型
  1. 解决C10k 的问题 异步非阻塞模型(Nginx, libevent, NodeJS)-- 问题 复杂度高 大量回调函数
  2. 协程(Go,Erlang, lua): 编写代码与协线性函数相同;理解根加轻量级线程
3. 并行执行程序 go foo() // 执行函数 4. mgs:= <- c 多个gorountine 需要通信 5. select 从多个channel 中读取数据 ,多个 channel 随机选择消费  6. 并发: 通过调度器使任务看起来运行 属于单核CPU(逻辑操作)对IO密集型更友好 7. 并行:任务的真实运行 

在go 语言中 并发执行 ,使用三种不同 gorountine, 负责装填、运输、处理 ,让程序并发运行,让任务更单一 这种思想 也可以将 日志分析读取,单独写入模块进行小模块,使用每个模块gorountine ,通过channel 数据交互,至于这么多gorountine 是在一个CPU调度执行还是分配到多个CPU上进行执行 ,取决于系统.

go 语言有自己的调度器, go fun() 属于独立工作单位,go根据每个可用的物理处理器分配一个逻辑处理器,通过这个逻辑处理器 处理独立单元, 通过设置: runtime.GOMAXPROCS(1)///给调度器分配多少个具体的逻辑处理器 服务器 物理处理器越多 ,go 得到的逻辑处理器越多,导致器允许的速度就越快。 参考:https://blog.csdn.net/ice_fire_x/article/details/105141409 

系统架构

日志分析的基本流程伪函数,以下函数有两个缺陷,分析介入和分析后输出只能写死,因此需要扩展,扩展接口模式

package main  import (  "fmt"  "strings"  "time" )  /** * 日志分析系统分为: 分析、读取、写入 */  type LogProcess struct { 
          path        string      // 读取文件路径  influxDBDsn string      // influx data source  rc          chan string // read module to process  wc          chan string // process to influx  }  
       
        // 返回函数使用 指针, 结构体很大 不需要进行拷贝 性能优化 
        func 
        (l 
        *LogProcess
        ) 
        ReadFromFile
        (
        ) 
        { 
          
        // 文件读取模块 line 
        := 
        "message" l
        .rc 
        <- line 
        } 
        func 
        (l 
        *LogProcess
        ) 
        Process
        (
        ) 
        { 
          
        // 文件解析模块 data 
        := 
        <-l
        .rc l
        .wc 
        <- strings
        .
        ToUpper
        (data
        ) 
        } 
        func 
        (l 
        *LogProcess
        ) 
        writeToInfluxDB
        (
        ) 
        { 
          fmt
        .
        Println
        (
        <-l
        .wc
        ) 
        } 
        func 
        main
        (
        ) 
        { 
          
        // lp 引用类型 lp 
        := 
        &LogProcess
        { 
          path
        : 
        "./tmp/access.log"
        , influxDBDsn
        : 
        "username&password..."
        , rc
        : 
        make
        (
        chan 
        string
        )
        , wc
        : 
        make
        (
        chan 
        string
        )
        , 
        } 
        // tree goroutine run 
        go lp
        .
        ReadFromFile
        (
        ) 
        go lp
        .
        Process
        (
        ) 
        // 需要定义 chan 将 Process 数据 传递给 influxDB 
        go lp
        .
        writeToInfluxDB
        (
        ) time
        .
        Sleep
        (
        2 
        * time
        .Second
        ) 
        } 
       
  • 接口方式约束 输入和输出 进行优化
package main

import (
	"fmt"
	"strings"
	"time"
)

/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct { 
        
	rc    chan string // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}

func (l *LogProcess) Process() { 
        
	// 文件解析模块
	data := <-l.rc
	l.wc <- strings.ToUpper(data)
}

type Writer interface { 
        
	writer(wc chan string)
}
type WriteToInfluxDB struct { 
        
	influxDBDsn string // influx data source
}

func (w *WriteToInfluxDB) writer(wc chan string) { 
        
	fmt.Println(<-wc)
}

type Read interface { 
        
	read(rc chan string)
}

type ReadFromFile struct { 
        
	path string // 读取文件
}

func (r *ReadFromFile) read(rc chan string) { 
        
	// 读取模块
	line := "message"
	rc <- line
}

func main() { 
        
	// lp 引用类型
	r := &ReadFromFile{ 
        
		path: "./tmp/access.log",
	}
	w := &WriteToInfluxDB{ 
        
		influxDBDsn: "username&password"}
	lp := &LogProcess{ 
        
		rc:    make(chan string),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(2 * time.Second)
}

读取模块具体实现

  • 从上次读取光标后开始逐行进行读取,无需每次都全部文件读取
package main

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"
)

/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct { 
        
	rc    chan []byte // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}

func (l *LogProcess) Process() { 
        
	// 文件解析模块
	for v := range l.rc { 
        
		l.wc <- strings.ToUpper(string(v))
	}

}

type Writer interface { 
        
	writer(wc chan string)
}
type WriteToInfluxDB struct { 
        
	influxDBDsn string // influx data source
}

func (w *WriteToInfluxDB) writer(wc chan string) { 
        
	// wc 通道另外一种读取方式
	for x := range wc { 
        
		fmt.Println(x)
	}

}

type Read interface { 
        
	read(rc chan []byte)
}

type ReadFromFile struct { 
        
	path string // 读取文件
}

func (r *ReadFromFile) read(rc chan []byte) { 
        
	// 实时系统: 从文件末尾逐行进行读取
	f, err := os.Open(r.path)
	if err != nil { 
        
		panic(fmt.Sprintln("open file error:%s", err.Error()))
	}

	// 文件末尾最开始进行读取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for { 
        
		line, err := rd.ReadBytes('\n')
		if err == io.EOF { 
        
			// d读取到文件末尾, 日志还没有写入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil { 
        
			panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
		}
		rc <- line[:len(line)-1]
	}

}

func main() { 
        
	// lp 引用类型
	r := &ReadFromFile{ 
        
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{ 
        
		influxDBDsn: "username&password"}
	lp := &LogProcess{ 
        
		rc:    make(chan []byte),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(100 * time.Second)
}

日志解析模块

  1. 冲Read Chan 中读取每一行数据
  2. 正则方式提取所需要的监控数据
  3. 将数据写入到influxDB
package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"os"
	"regexp"
	"strconv"
	"time"
)

/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct { 
        
	rc    chan []byte // read module to process
	wc    chan *Message // process to influx
	read  Read
	write Writer
}

//日志写入结构体
type Message struct { 
        
	TimeLocal time.Time
	BytesSent int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime float64
}

func (l *LogProcess) Process() { 
        
	// 通过正则表达式进行解析数据
	r := regexp.MustCompile(`(\s*)`)
	loc, _ := time.LoadLocation("Asia/shanghai")
	// 文件解析模块
	for v := range l.rc { 
        
		ret := r.FindStringSubmatch(string(v))
		if len(ret) != 13 { 
        
			log.Println("FindStringSub match fail:", string(v))
			continue
		}
		message := &Message{ 
        
			
		}
		location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
		if err != nil { 
        
			log.Println("ParseInLocation fail:", err.Error(), ret[4])
		}
		message.TimeLocal = location
		
		// 字符串类型转换成int
		atoi, err := strconv.Atoi(ret[8])
		if err != nil { 
        
			log.Println("strconv.Atoi fail:", err.Error(), ret[4])
		}
		message.BytesSent = atoi
		l.wc <- message
	}

}

type Writer interface { 
        
	writer(wc chan *Message)
}
type WriteToInfluxDB struct { 
        
	influxDBDsn string // influx data source
}

func (w *WriteToInfluxDB) writer(wc chan *Message) { 
        
	// wc 通道另外一种读取方式
	for x := range wc { 
        
		fmt.Println(x)
	}

}

type Read interface { 
        
	read(rc chan []byte)
}

type ReadFromFile struct { 
        
	path string // 读取文件
}

func (r *ReadFromFile) read(rc chan []byte) { 
        
	// 实时系统: 从文件末尾逐行进行读取
	f, err := os.Open(r.path)
	if err != nil { 
        
		panic(fmt.Sprintf("open file error:%s\n", err.Error()))
	}

	// 文件末尾最开始进行读取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for { 
        
		line, err := rd.ReadBytes('\n')
		if err == io.EOF { 
        
			// d读取到文件末尾, 日志还没有写入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil { 
        
			panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
		}
		rc <- line[:len(line)-1]
	}

}

func main() { 
        
	// lp 引用类型
	r := &ReadFromFile{ 
        
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{ 
        
		influxDBDsn: "username&password"}
	lp := &LogProcess{ 
        
		rc:    make(chan []byte),
		wc:    make(chan *Message),
		read:  r,
		write: w,
	}
	// 通过接口方式 约束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通过参数注入方式
	time.Sleep(100 * time.Second)
}

标签: mgs203磁性接近传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台