博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
日志分析大致流程
阅读量:5757 次
发布时间:2019-06-18

本文共 4545 字,大约阅读时间需要 15 分钟。

简单概述:

生产过程中会生成大量的系统日志,应用程序日志,安全日志等等,通过对日志的分析可以了解服务器的负载,健康状况,可以分析客户的分布情况,客户的行为,甚至基于这些分析可以做出预测。

一般采集流程:

日志产出 ——>采集——>储存——>分析——>储存——>可视化

数据提取:

由于日志文件基本都以文本形式产出,所以对日志的分析基本就是对文本的字符串进行分析。所以我们需将文本中有用的信息通过一些设定条件将其提取出来以方便后面操作。

所以我们的思路,通过遍历路径将所有的相关日志文件全部按行输出,再通过正则表达式写出每行信息相对的提取规则,再加个字典文件对提取出来的有用信息进一步提升。     # line = '''114.249.235.230 - - [11/Apr/2017:10:45:51 +0800] "GET / HTTP/1.1" 200 7488 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2_1 like Mac OS X) AppleWebKit/602.4.6 (KHTML, like Gecko) Version/10.0 Mobile/14D27 Safari/602.1"''    以上是这篇日志的主要格式。

import re

patten = '''(?P<name1>[\d.]{7,15}) - - [(?P<datetime>[/\w +:]+)] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<name5>.+)"'''

opms = {'datetime':lambda strt:datetime.datetime.strptime(strt,'%d/%b/%Y:%H:%M:%S %z'),'status':int, 'length':int}

gexdx = re.compile(patten)

def exent(line:str):

matcher = gexdx.match(line)
return {k:opms.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

以上就是最终代码的提取每行有用信息的代码。

from pathlib import Path

def fir_load(filename ,encoding='utf-8'):
with open(filename,encoding='utf-8') as f:
for line in f:
fields = exent(line)
if fields:
yield fields
else:
pass

def load(paths,encoding='utf-8',ext='.log',r=False):

for p in paths:
path = Path(p)
if path.is_dir():
if isinstance(ext,str):
ext = [ext]
for e in ext:
logs = path.rglob(e) if r else path.glob(e) # 遍历当前目录
for log in logs: # path对象
yield from fir_load(str(log.absolute()), encoding=encoding)
elif path.is_file():
yield from fir_load(str(path),encoding='utf-8')

以上是遍历路径提取所有相关日志文件并按行输出,并调用函数exent,从而对日志文件完成提取操作。

日志文件的有用文件提取出来后,后面也就明了了主要就是对自己需要的信息进行分析操作了,这里我们需先进行时间管理分析,通过这个代码可以将日志文件按照文件本身的产生时间,通过分组输出处理,优化了分析。

import datetime
def window(handler,width:int,interval:int):
buf = []
start = datetime.datetime.strptime('1970/01/01 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1970/01/01 00:00:02 +0800', '%Y/%m/%d %H:%M:%S %z')
delta =datetime.timedelta(seconds=width-interval)
while True:
for date in s:
if date:
buf.append(date)
current = date['datetime']

if (current-start).total_seconds() > interval:            ret = handler(buf)            print(ret)            start = current            buf = [x for x in buf if x['datetime'] > (current - delta)]那么这里完了接下来就是将相应的分析代码写出来传个形参handler就可以得到分析结果了,比如,先写个状态码分析:#状态码分析

def status_handler(iterable): #列表包字典

state = {}
for item in iterable:
ss = item['status']
state[ss] =state.get(ss,0)+1
length = len(iterable)
return {k:v/length for k,v in state.items()}

这样一个简单的日志分析就完了,但是如果想同时进行多个分析怎办呢?在平常工作过程中难免会要进行多次分析的,这时就需用到分发。    #分发器

def dispatcher(src):

handlers = []
queues = []

def reg(handler,width:int,interval:int):    q =Queue()    queues.append(q)    h = threading.Thread(target = window,args=(q,handler,width,interval))    handlers.append(h)def run():    for t in handlers:        t.start()         #启动线程处理数据    for item in src:       #将数据源取到的数据分发到所有队列中        for q in queues:            q.put(item)return reg,run    这样的话,window函数也要进行小小的修改,以能get到队列。    def window(src:Queue,handler,width:int,interval:int):buf = []start = datetime.datetime.strptime('1970/01/01 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')current = datetime.datetime.strptime('1970/01/01 00:00:02 +0800', '%Y/%m/%d %H:%M:%S %z')delta =datetime.timedelta(seconds=width-interval)while True:    date = src.get()    if date:        buf.append(date)        current = date['datetime']        if (current-start).total_seconds() > interval:            ret = handler(buf)            print(ret)            start = current            buf = [x for x in buf if x['datetime'] > (current - delta)]

这样话再调用的话就可以了。这里再加个浏览器分析

#浏览器分析
allbrowsers ={}#所有浏览器的统计
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']

key = (ua.browser.family,ua.browser.version_string)    browsers[key] = browsers.get(key,0)+1    allbrowsers[key] = allbrowsers.get(key,0)+1print(sort(allbrowsers.items(),key=lambda x:x[1],reverse=True)[:10])return browsers

再对数据提取的条件进行适量改变然后调用运行即可。

数据提取:
from user_agents import parse

patten = '''(?P<name1>[\d.]{7,15}) - - [(?P<datetime>[/\w +:]+)] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>[^"]+)"'''

opms = {'datetime':lambda strt:datetime.datetime.strptime(strt,'%d/%b/%Y:%H:%M:%S %z'),'status':int, 'length':int,'useragent':lambda ua:parse(ua)}

最终调用:
if name == 'main':
path = 'G:/'

reg,run = dispatcher(load(path))reg(status_handler,10,5)reg(browser_handler,5,5)run()

转载于:https://blog.51cto.com/13929187/2175706

你可能感兴趣的文章
hiveserver2修改线程数
查看>>
XML教程
查看>>
oracle体系结构
查看>>
Microsoft Exchange Server 2010与Office 365混合部署升级到Exchange Server 2016混合部署汇总...
查看>>
Proxy服务器配置_Squid
查看>>
开启“无线网络”,提示:请启动windows零配置wzc服务
查看>>
【SDN】Openflow协议中对LLDP算法的理解--如何判断非OF区域的存在
查看>>
纯DIV+CSS简单实现Tab选项卡左右切换效果
查看>>
栈(一)
查看>>
ios 自定义delegate(一)
查看>>
创建美国地区的appleId
查看>>
例题10-2 UVa12169 Disgruntled Judge(拓展欧几里德)
查看>>
JS 原生ajax写法
查看>>
Composer管理PHP依赖关系
查看>>
React.js学习笔记之JSX解读
查看>>
我所了解的Libevent和SEDA架构
查看>>
Socket编程问题小记
查看>>
基于Flask-Angular的项目组网架构与部署
查看>>
一张图道尽程序员的出路
查看>>
redis 常用命令
查看>>