Python重写Logstash,把NginxAccessLog清洗后汇入ElasticDB

Step1. 修改Nginx的log格式(改为JSON格式)
把nginx的log_format改为以下的参数(修改/etc/nginx/nginx.conf):
log_format main '{"@timestamp":"$time_iso8601","host":"$server_addr","clientip":"$remote_addr","size":$body_bytes_sent,"responsetime":$request_time,"upstreamtime":"$upstream_response_time","upstreamhost":"$upstream_addr","http_host":"$host","url":"$uri","xff":"$http_x_forwarded_for","referer":"$http_referer","agent":"$http_user_agent","status":"$status"}';
reload nginx后,看到access.log的格式如下:
{"@timestamp":"2017-12-13T17:29:49+08:00","host":"120.76.XX.XX","clientip":"120.76.XX.XXX","size":26963,"responsetime":0.000,"upstreamtime":"0.000","upstreamhost":"127.0.0.1:8080","http_host":"weixin.XXX.com","url":"/XXXXXXX/haowanyihao/thumb.png","xff":"111.22.65.171","referer":"-","agent":"WeChat/6.6.0.32 CFNetwork/811.4.18 Darwin/16.5.0","status":"200"}
Step2. 编写python程式

创新互联公司主要从事成都做网站、成都网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务井研,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108

# -- coding: utf-8 --
'''
  By Willson Luo at 2017/11/23 v1.0
'''
import pandas as pd
import json,time,datetime,iso8601
from elasticsearch import Elasticsearch
from geoip import geolite2
# connect to elasticsearch database
es = Elasticsearch( "localhost:9200" )
es = Elasticsearch(hosts=[{'host': 'localhost', 'port': '9200'}],httpauth=('elastic', 'xxxxx'))
# nginx column name
#title    = ['@timestamp','host','clientip','size','responsetime','upstreamtime','upstreamhost','httphost','url','xff','referer','agent','status']
# nginx access log
ngxlog  = 'access.log'
ngxdata = open(ngxlog).readlines()
# nginx data(json format)
ngxjson = {}
for a1 in range(len(ngxdata)):
    step1 = ngxdata[a1].strip().split("\"")
    abc = iso8601.parsedate(step1[3])
    bcd = abc.strftime('%Y-%m-%dT%H:%M:%S%Z')
    cde = abc.strftime('%Y%m%d')
    ngxindex   = 'logstash-weixin-nginx-access-'+ cde
    ngxjson['@timestamp'] = bcd
    ngxjson['host'] = step1[7]
    ngxjson['size'] = step1[14].replace(":","").replace(",","")
    ngxjson['responsetime'] = step1[16].replace(":","").replace(",","")
    ngxjson['upstreamtime'] = step1[19]
    ngxjson['upstreamhost'] = step1[23]
    if step1[35] == "-":
        ngxjson['clientip']  = step1[11]
        ngxjson['httphost'] = step1[27]
        ipaddr = step1[11]
    else:
        ngxjson['clientip']  = step1[35].split(",")[0]
        ngxjson['httphost'] = step1[39]
        ipaddr = step1[35].split(",")[0]
    if "Apple" in step1[43]:
        ngxjson['agent']="Apple"
    elif "WeChat" in step1[43]:
        ngxjson['agent']="WeChat"
    elif "curl" in step1[43]:
        ngxjson['agent']="Linux"
    elif "Alibaba" in step1[43]:
        ngxjson['agent']="Aliyun"
    elif "Android" in step1[43]:
        ngxjson['agent']="Android"
    elif "MSIE" in step1[43]:
        ngxjson['agent']="IE"
    elif "Firefox" in step1[43]:
        ngxjson['agent']="Firefox"
    elif "Windows" in step1[43]:
        ngxjson['agent']="Windows"
    elif "Apache-Http" in step1[43]:
        ngxjson['agent']="Apache"
    else:
        ngxjson['agent']= step1[43]
    ngxjson['status']= step1[47]
    location = geolite2.lookup(ipaddr).location
    match = geolite2.lookup(ipaddr).getinfodict()
    location = []
    location.append(match['location']['longitude'])
    location.append(match['location']['latitude'])
    geoip = {}
    geoip['location'] = location
    if match.haskey('city'):
        city = match['city']['names']['en']
    else:
        city = "-"
    if match.haskey('country'):
        country = match['country']['names']['en']
    else:
        country = "-"
    if match.haskey('subdivisions'):
        subdivisions = match['subdivisions'][0]['names']['en']
    else:
        subdivisions = "-"
    ngxjson['geoip']        = geoip
    ngxjson['country']      = country
    ngxjson['subdivisions'] = subdivisions
    ngxjson['city']         = city
    ngxjson['possition']    = country+"-"+subdivisions+"-"+city
    print a1,ngxjson
    es.index( index=ngxindex, doctype="logs", body=ngxjson )

Step3. 通过Kibana把数据呈现处理
1> 先在Kibana把index汇入(一般第一步就让你建立这个东西了)Kibana-->Management-->Kibana(Index Patterns)
Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB
2> 构建可用视图Kibana--> Visualize(这个东西比较见人见智)
Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB
Step4. 构建Dashboard(就是把Visualize的内容拖进来)
Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB

第一次写Blog,估计错漏不少,麻烦指正,谢谢


文章标题:Python重写Logstash,把NginxAccessLog清洗后汇入ElasticDB
网站网址:http://pcwzsj.com/article/jpeooe.html