25日志分析项目-创新互联

生产中会生成大量的系统日志、应用程序日志、安全日志等等,通过对日志的分析,可了解服务器的负载、健康状态,可分析客户的分布情况、客户的行为,甚至基于这些分析可做出预测;

成都创新互联是网站建设专家,致力于互联网品牌建设与网络营销,专业领域包括成都网站建设、做网站、电商网站制作开发、成都小程序开发、微信营销、系统平台开发,与其他网站设计及系统开发公司不同,我们的整合解决方案结合了恒基网络品牌建设经验和互联网整合营销的理念,并将策略和执行紧密结合,且不断评估并优化我们的方案,为客户提供全方位的互联网品牌整合方案!

一般采集流程:

日志产出-->采集-->存储-->分析-->存储-->可视化;

采集(logstash、flume(apache)、scribe(facebook));

开源实时日志分析,ELK平台:

logstash收集日志,存放到ES集群中,kibana从ES中查询数据生成图表,返回browser;

离线分析;

在线分析,一份生成日志,一份传给大数据实时处理服务;

实时处理技术:storm、spark;

分析的前提:

半结构化数据:日志是半结构化数据,是有组织的,有格式的数据,可分割成行和列,可当作表来处理,也可分析里面的数据;

文本分析:日志是文本文件,需要依赖文件io、字符串操作、正则等技术,通过这些技术能把日志中需要的数据提取出来;

例:

123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

提取数据:

1、用空格分割;

方1:

25日志分析项目

方2:先空格分割,遇""[]特殊处理;

2、用正则提取;

1、

import datetime

logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800]

"GET / HTTP/1.1" 200 8642 "-"

"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

names = ('remote','','','datetime','request','status','length','','useragent')

ops = (None,None,None,lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

lambda request: dict(zip(['method','url','protocol'],request.split())),int,int,None,None)

def extract(line):

fields = []

flag = False

tmp = ''

for field in line.split():

#     print(field)

if not flag and (field.startswith('[') or field.startswith('"')):

if field.endswith(']') or field.endswith('"'):

fields.append(field.strip())

else:

tmp += field[1:]

#             print(tmp)

flag = True

continue

if flag:

if field.endswith(']') or field.endswith('"'):

tmp += ' ' + field[:-1]

fields.append(tmp)

flag = False

tmp = ''

else:

     tmp += ' ' + field

continue

fields.append(field)

print(fields)

info = {}

for i,field in enumerate(fields):

#         print(i,field)

name = names[i]

op = ops[i]

if op:

info[name] = (op(field),op)

return info

print(extract(logs))

输出:

['123.125.71.36', '-', '-', '06/Apr/2017:18:09:25 +0800', 'GET / HTTP/1.1', '200', '8642', '"-"', 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)']

Out[16]:

{'datetime': (datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))),

>),

'length': (8642, int),

'request': ({'method': 'GET', 'protocol': 'HTTP/1.1', 'url': '/'},

>),

'status': (200, int)}

2、

25日志分析项目

((?:\d{1,3}\.){3}\d{1,3}) - - \[([/:+ \w]+)\] "(\w+) (\S+) ([/\.\w\d]+)" (\d+) (\d+) .+ "(.+)"

import datetime

import re

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

ops = {

'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

'status': int,

'length': int

}

pattern = '''(?P(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P[/:+ \w]+)\] "(?P\w+) (?P\S+) (?P[/\.\w\d]+)" (?P\d+) (?P\d+) .+ "(?P.+)"'''

regex = re.compile(pattern)

def extract(line)->dict:

matcher = regex.match(line)

info = None

if matcher:

info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

return info

# print(extract(logs))

def load(path:str):   #装载日志文件

with open(path) as f:

for line in f:

d = extract(line)

if d:

yield d   #生成器函数

else:

continue   #不合格数据,pycharm中左下角TODO(view-->Status Bar)

g = load('access.log')

print(next(g))

print(next(g))

print(next(g))

# for i in g:

#     print(i)

输出:

{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}

{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}

注:

代码若在jupyter下,注意logs中内容不能换行;

滑动窗口:

或叫时间窗口,时间窗口函数,在数据分析领域极其重要;

很多数据,如日志,都是和时间相关的,都是按时间顺序产生的,在数据分析时,要按照时间来求值;

interval,表示每一次求值的时间间隔;

width,时间窗口宽度,指一次求值的时间窗口宽度,每个时间窗口的数据不均匀;

当width > interval

25日志分析项目

有重叠;

当width = interval

25日志分析项目25日志分析项目

数据求值没有重叠;

当width < interval

一般不采纳这种方案,会有数据缺失;

如业务数据有1000万条,要求每次漏几个,这不影响统计趋势;

25日志分析项目25日志分析项目

c2 = c1 - delta

delta = width - interval

delta = 0时,width = interval

时序数据,运维环境中,日志、监控等产生的数据是按时间先后产生并记录下来的,与时间相关的数据,一般按时间对数据进行分析;

数据分析基本程序结构:

例:

一函数,无限的生成随机数函数,产生时间相关的数据,返回->时间+随机数;

每次取3个数据,求平均值;

import random

import datetime

# def source():

#     while True:

#         yield datetime.datetime.now(),random.randint(1,100)

# i = 0

# for x in source():

#     print(x)

#     i += 1

#     if i > 100:

#         break

# for _ in range(100):

#     print(next(source()))

def source():

while True:

yield {'value': random.randint(1,100),'datetime':datetime.datetime.now()}

src = source()

# lst = []

# lst.append(next(src))

# lst.append(next(src))

# lst.append(next(src))

lst = [next(src) for _ in range(3)]

def handler(iterable):

values = [x['value'] for x in iterable]

return sum(values) // len(values)

print(lst)

print(handler(lst))

窗口函数:

import datetime

import re

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

ops = {

'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

'status': int,

'length': int

}

pattern = '''(?P(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P[/:+ \w]+)\] "(?P\w+) (?P\S+) (?P[/\.\w\d]+)" (?P\d+) (?P\d+) .+ "(?P.+)"'''

regex = re.compile(pattern)

def extract(line)->dict:

matcher = regex.match(line)

info = None

if matcher:

info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

return info

# print(extract(logs))

def load(path:str):

with open(path) as f:

for line in f:

d = extract(line)

if d:

yield d

else:

continue

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

# for i in g:

#     print(i)

def window(src,handler,width:int,interval:int):

# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

seconds = width - interval

delta = datetime.timedelta(seconds)

buffer = []

for x in src:

if x:

buffer.append(x)

current = x['datetime']

if (current-start).total_seconds() >= interval:

ret = handler(buffer)

# print(ret)

start = current

# tmp = []

# for i in buffer:

#     if i['datetime'] > current - delta:

#         tmp.append(i)

buffer = [i for i in buffer if i['datetime'] > current - delta]

def donothing_handler(iterable:list):

print(iterable)

return iterable

def handler(iterable:list):

pass   #TODO

def size_handler(iterable:list):

pass   #TODO

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

window(load('access.log'),donothing_handler,5,5)

输出:

[{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}]

[{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}]

[{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}]

分发:

生产者消费者模型:

对于一个监控系统,需要处理很多数据,包括日志;

要有数据的采集、分析;

被监控对象,即数据的producer生产者,数据的处理程序,即数据的consumer消费者;

传统的生产者消费者模型,生产者生产,消费者消费,这种模型有些问题,开发的代码耦合太高,如果生产规模扩大,不易扩展,生产和消费的速度难匹配;

queue队列,食堂打饭;

producer-consumer,卖包子;消费速度 >= 生产速度;解决办法:queue,作用:解耦(在程序间实现解耦(服务间解耦))、缓冲;

注:

zeromq,底层通信协议用;

大多数*mq,都是消费队列;

kafka,性能极高;

FIFO,先进先出;

LIFO,后进先出;

数据的生产是不稳定的,会造成短时间数据的潮涌,需要缓冲;

消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据;

单机可用queue(内建模块)构建进程内的队列,满足多个线程间的生产消费需要;

大型系统可使用第三方消息中间件,rabbitmq、rocketmq、kafka;

queue模块:

queue.Queue(maxsize=0),queue提供了一个FIFO先进先出的队列Queue,创建FIFO队列,返回Queue对象;maxsize <= 0,队列长度没有限制;

q = queue.Queue()

q.get(block=True,timeout=None),从队列中移除元素并返回这个元素,只要get过即拿走就没了;

block阻塞,timeout超时;

若block=True,是阻塞,timeout=None,就是一直阻塞,timeout有值,即阻塞到一定秒数抛Empty异常;

若blcok=False,是非阻塞,timeout将被忽略,要么成功返回一个元素,要么抛Empty异常;

q.get_nowait(),等价于q.get(block=False)或q.get(False),即要么成功返回一个元素,要么抛Empty异常;这种阻塞效果,要多线程中举例;

q.put(item,block=True,timeout=None),把一个元素加入到队列中去,

block=True,timeout=None,一直阻塞直至有空位放元素;

block=True,timeout=5,阻塞5秒抛Full异常;

block=False,timeout失效,立即返回,能塞进去就塞,不能则抛Full异常;

q.put_nowait(item),等价于q.put(item,False);

注:

Queue的长度是个近似值,不准确,因为生产消费一直在进行;

q.get(),只要get过,即拿走,数据就没了;而kafka中,拿走数据后,kafka中仍保留有,由consumer来清理;

例:

from queue import Queue

import random

q = Queue()

q.put(random.randint(1,100))

q.put(random.randint(1,100))

print(q.get())

print(q.get())

# print(q.get())   #block

print(q.get(timeout=3))

输出:

2

35

Traceback (most recent call last):

File "/home/python/magedu/projects/cmdb/queue_Queue.py", line 12, in

print(q.get(timeout=3))

File "/ane/python3.6/lib/python3.6/queue.py", line 172, in get

raise Empty

queue.Empty

分发器的实现:

生产者(数据源)生产数据,缓冲到消息队列中;

数据处理流程:数据加载-->提取-->分析(滑动窗口函数);

处理大量数据时,对于一个数据源来说,需要多个消费者处理,但如何分配数据?

需要一个分发器(调度器),把数据分发给不同的消费者处理;

每一个消费者拿到数据后,有自己的处理函数,所以要有一种注册机制;

数据加载-->提取-->分发-->分析函数1|分析函数2,一个数据通过分发器,发送给n个消费者,分析函数1|分析函数2为不同的handler,不同的窗口宽度,间隔时间;

如何分发?

一对多,副本发送(一个数据通过分发器,发送到n个消费者),用轮询;

MQ?

在生产者和消费者之间用消息队列,那么所有的消费者共用一个消息队列?(这需要解决争抢的问题);还是各自拥有一个消息队列?(较容易);

注册?

在调度器内部记录有哪些消费者,记录消费者自己的队列;

线程?

由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程;

注:

import threading

t = threading.Thread(target=window,args=(src,handler,width,interval))   #target,线程中运行的函数,args,这个函数运行时需要的实参用tuple

t.start()

分析功能:

分析日志很重要,通过海量数据的分析就能知道是否遭受了***,是否是爬取的高峰期,是否有盗链;

分析的逻辑放到handler中;

window仅通过时间窗口挪动取数据,不要将其的功能做的丰富全面,若需统一处理,独立出单独的函数;

注:

爬虫:baiduspider,googlebot,SEO,http,request,response;

状态码分析:

状态码中包含了很多信息;

304,服务器收到客户端提交的请求数,发现资源未变化,要求browser使用静态资源的缓存;

404,server找不到请求的资源;

304占比大,说明静态缓存效果明显;

404占比大,说明出现了错误链接,或深度嗅探网站资源;

若400,500占比突然开始增大,网站一定出问题了;

import datetime

import re

from queue import Queue

import threading

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

ops = {

'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

'status': int,

'length': int

}

pattern = '''(?P(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P[/:+ \w]+)\] "(?P\w+) (?P\S+) (?P[/\.\w\d]+)" (?P\d+) (?P\d+) .+ "(?P.+)"'''

regex = re.compile(pattern)

def extract(line)->dict:

matcher = regex.match(line)

info = None

if matcher:

info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

return info

# print(extract(logs))

def load(path:str):

with open(path) as f:

for line in f:

d = extract(line)

if d:

yield d

else:

continue

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

# for i in g:

#     print(i)

# def window(src,handler,width:int,interval:int):

#     # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

#     start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

#     current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

#     seconds = width - interval

#     delta = datetime.timedelta(seconds)

#     buffer = []

#

#     for x in src:

#         if x:

#             buffer.append(x)

#             current = x['datetime']

#         if (current-start).total_seconds() >= interval:

#             ret = handler(buffer)

#             # print(ret)

#             start = current

#             # tmp = []

#             # for i in buffer:

#             #     if i['datetime'] > current - delta:

#             #         tmp.append(i)

#             buffer = [i for i in buffer if i['datetime'] > current - delta]

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

# window(load('access.log'),donothing_handler,5,5)

def window(src:Queue,handler,width:int,interval:int):

# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

delta = datetime.timedelta(width-interval)

buffer = []

while True:

data = src.get()

if data:

buffer.append(data)

current = data['datetime']

if (current-start).total_seconds() >= interval:

ret = handler(buffer)

# print(ret)

start = current

buffer = [i for i in buffer if i['datetime'] > current - delta]

def donothing_handler(iterable:list):

print(iterable)

return iterable

def handler(iterable:list):

pass   #TODO

def size_handler(iterable:list):

pass   #TODO

def status_handler(iterable:list):

d = {}

for item in iterable:

key = item['status']

if key not in d.keys():

d[key] = 0

d[key] += 1

total = sum(d.values())

print({k:v/total*100 for k,v in d.items()})   #return

def dispatcher(src):

queues = []

threads = []

def reg(handler,width,interval):

q = Queue()

queues.append(q)

t = threading.Thread(target=window,args=(q,handler,width,interval))

threads.append(t)

def run():

for t in threads:

t.start()

for x in src:

for q in queues:

q.put(x)

return reg,run

reg,run = dispatcher(load('access.log'))

reg(status_handler,8,5)

run()

日志文件加载:

改为接受一批;

如果一批路径,迭代每一个路径;

如果路径是一个普通文件,按行读取内容(假设是日志文件);

如果路径是一个目录,就遍历路径下的所有普通文件,每一个文件按行处理,不递归处理子目录;

def openfile(path:str):

with open(path) as f:

for line in f:

d = extract(line)

if d:

yield d

else:

continue

def load(*paths):

for file in paths:

p = Path(file)

if not p.exists():

continue

if p.is_dir():

for x in p.iterdir():

if x.is_file():

# for y in openfile(str(x)):

#     yield y

yield from openfile(str(x))

elif p.is_file():

# for y in openfile(str(p)):

#     yield y

yield from openfile(str(p))

离线日志分析项目:

可指定文件或目录,对日志进行数据分析;

分析函数可动态注册;

数据可分发给不同的分析处理程序处理;

关键步骤:

数据源处理(处理一行行数据);

拿到数据后的处理(作为分析,一小批一小批处理,窗口函数);

分发器(生产者和消费者间作为桥梁作用);

浏览器分析:

useragent,指软件按一定的格式向远端服务器提供一个标记自己的字符串;

在http协议中,使用user-agent字段传送一这个字符串,这个值可被修改(想伪装谁都可以);

格式:([platform details]) [extensions]

例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36"

注:

chrome-->console,navigator.userAgent,将内容复制粘贴到傲游的自定义UserAgent中;

信息提取模块:

user-agents、pyyaml、ua-parser;

]$ pip install user-agents pyyaml ua-parser

例:

from user_agents import parse

u = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36'

ua = parse(u)

print(ua.browser)

print(ua.browser.family)

print(ua.browser.version_string)

输出:

Browser(family='Chrome', version=(28, 0, 1500), version_string='28.0.1500')

Chrome

28.0.1500

整合,完整代码:

25日志分析项目

import datetime

import re

from queue import Queue

import threading

from pathlib import Path

from user_agents import parse

from collections import defaultdict

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

ops = {

'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

'status': int,

'length': int,

'request': lambda request: dict(zip(('method','url','protocol'),request.split())),

'useragent': lambda useragent: parse(useragent)

}

# pattern = '''(?P(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P[/:+ \w]+)\] "(?P\w+) (?P\S+) (?P[/\.\w\d]+)" (?P\d+) (?P\d+) .+ "(?P.+)"'''

pattern = '''(?P(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P[/:+ \w]+)\] "(?P\w+) (?P\S+) (?P[/\.\w\d]+)" (?P\d+) (?P\d+) .+ "(?P.+)"'''

regex = re.compile(pattern)

def extract(line)->dict:

matcher = regex.match(line)

info = None

if matcher:

info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

# print(info)

return info

# print(extract(logs))

# def load(path:str):

#     with open(path) as f:

#         for line in f:

#             d = extract(line)

#             if d:

#                 yield d

#             else:

#                 continue

def openfile(path:str):

with open(path) as f:

for line in f:

d = extract(line)

if d:

yield d

else:

continue

def load(*paths):

for file in paths:

p = Path(file)

if not p.exists():

continue

if p.is_dir():

for x in p.iterdir():

if x.is_file():

# for y in openfile(str(x)):

#     yield y

yield from openfile(str(x))

elif p.is_file():

# for y in openfile(str(p)):

#     yield y

yield from openfile(str(p))

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

# for i in g:

#     print(i)

# def window(src,handler,width:int,interval:int):

#     # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

#     start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

#     current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

#     seconds = width - interval

#     delta = datetime.timedelta(seconds)

#     buffer = []

#

#     for x in src:

#         if x:

#             buffer.append(x)

#             current = x['datetime']

#         if (current-start).total_seconds() >= interval:

#             ret = handler(buffer)

#             # print(ret)

#             start = current

#             # tmp = []

#             # for i in buffer:

#             #     if i['datetime'] > current - delta:

#             #         tmp.append(i)

#             buffer = [i for i in buffer if i['datetime'] > current - delta]

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

# window(load('access.log'),donothing_handler,5,5)

def window(src:Queue,handler,width:int,interval:int):

# src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

delta = datetime.timedelta(width-interval)

buffer = []

while True:

data = src.get()

if data:

buffer.append(data)

current = data['datetime']

if (current-start).total_seconds() >= interval:

     ret = handler(buffer)

# print(ret)

start = current

buffer = [i for i in buffer if i['datetime'] > current - delta]

def donothing_handler(iterable:list):

print(iterable)

return iterable

def handler(iterable:list):

pass   #TODO

def size_handler(iterable:list):

pass   #TODO

def status_handler(iterable:list):

d = {}

for item in iterable:

key = item['status']

if key not in d.keys():

d[key] = 0

d[key] += 1

total = sum(d.values())

print({k:v/total*100 for k,v in d.items()})   #return

browsers = defaultdict(lambda :0)

def browser_handler(iterable:list):

# browsers = {}

for item in iterable:

ua = item['useragent']

key = (ua.browser.family,ua.browser.version_string)

# browsers[key] = browsers.get(key,0) + 1

browsers[key] += 1

return browsers

def dispatcher(src):

queues = []

threads = []

def reg(handler,width,interval):

q = Queue()

queues.append(q)

t = threading.Thread(target=window,args=(q,handler,width,interval))

threads.append(t)

def run():

for t in threads:

t.start()

for x in src:

for q in queues:

q.put(x)

return reg,run

reg,run = dispatcher(load('access.log'))

reg(status_handler,8,5)

reg(browser_handler,5,5)

run()

print(browsers)

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享文章:25日志分析项目-创新互联
分享链接:http://pcwzsj.com/article/djihdd.html