mysql慢查询和错误日志分析

MySQL慢查询和错误日志分析和告警查看比较麻烦,目前的慢查询告警都是仅仅反应慢查询数量的。
我们做了一个慢查询日志告警和分析的程序
后台使用filebeat日志文件托运工具,将日志传输到redis数据库。filebeat默认使用es。定时器1分钟执行一次。
vi  /etc/filebeat/filebeat.yml
filebeat.prospectors:
  paths:
    - /data/mysql/xxx/tmp/slow.log
  document_type: syslog
  fields:
        app: mysql_slowlog
        port: xxx
        ip: xxxx
  scan_frequency: 30s
  tail_files: true
  multiline.pattern: '^\#\ Time'
  multiline.negate: true
  multiline.match: after
        app: mysql_slowlog
output.redis:
  enabled: true
  hosts: ["IP:port"]
  port: 2402
  key: filebeat
  keys:
    - key: "%{[fields.app]}"
      mapping:
        "mysql_slowlog": "mysql_slowlog"
        "mysql_errorlog": "mysql_errorlog"
  db: 0
  datatype: list
logging.to_files: true

在监控端读取redis数据,并通过正则处理到mysql数据库。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading
# redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog'
# mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
    return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
    data_list = []
    logList = []
    keyState = redisConn.exists(redisKey)
    if keyState:
        logLen = redisConn.llen(redisKey)
        if logLen > 0:
            redisKeyNew = redisKey + '-bak'
            redisConn.renamenx(redisKey, redisKeyNew)
            logList = redisConn.lrange(redisKeyNew,0,logLen)
            redisConn.delete(redisKeyNew)
        else:
            pass
    else:
        pass
    if len(logList) > 0:
        for item in logList:
            data_dict = {}
            slowLogJson = json.loads(item)
            #print(slowLogJson['message'])
            data_dict['hostname'] = slowLogJson['beat']['hostname']
            #print(slowLogJson['beat']['hostname'])
            data_dict['ip'] = slowLogJson['fields']['ip']
            #print(slowLogJson['fields']['ip'])
            data_dict['port'] = slowLogJson['fields']['port']
            #print(slowLogJson['fields']['port'])
            logContent = slowLogJson['message']
            #Regex
            timeRe = r'# Time: (.*)\n# User@Host:'
            userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
            hostRe = r'# User@Host: .*\[(.*?)\]  Id:'
            schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
            queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
            locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
            rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
            bytesRe = r'# Bytes_sent:\s+(\d+)'
            timestampRe = r'SET\s+timestamp=(.*?);'
            commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
            if re.findall(timeRe, logContent):
                data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
                data_dict['sys_time'] =  data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
                data_dict['cli_user'] = re.findall(userRe, logContent)[0]
                data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
                data_dict['schema'] = re.findall(schemaRe,logContent)[0]
                data_dict['query_time'] = re.findall(queryRe,logContent)[0]
                data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
                data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
                data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
                data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
                data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
                data_list.append(data_dict)
            else:
                pass
                #print('Not slowlog data')
    else:
        pass
        #print('No data')
    return data_list
def send_data(data,mysql_pool):
    mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
    mysqlTable = mysqlTablePrefix + mysqlTableDate
    cursor = mysql_pool.cursor()
    data_list = []
    createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
                     "hostname varchar(64) NOT NULL," \
                     "ip varchar(20) NOT NULL," \
                     "port int(11) NOT NULL," \
                     "sys_time datetime NOT NULL," \
                     "cli_user varchar(32) NOT NULL," \
                     "cli_ip varchar(32) NOT NULL," \
                     "`schema` varchar(32) NOT NULL," \
                     "query_time float(6,3) NOT NULL," \
                     "lock_time float(6,3) NOT NULL," \
                     "rows_sent int(11) NOT NULL," \
                     "bytes_sent int(11) NOT NULL," \
                     "`timestamp` varchar(40) NOT NULL," \
                     "command varchar(2048) DEFAULT NULL," \
                     "PRIMARY KEY (`id`)," \
                     "KEY `idx_slowlog_000000_user` (`cli_user`)," \
                     "KEY `idx_slowlog_000000_query_time` (`query_time`)," \
                     "KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
    createTableSql = createTableSql.replace('000000',mysqlTableDate)
    # Create slow log table if not exist
    try:
        cursor.execute("show tables like '%s'" % mysqlTable)
        res = cursor.fetchone()
        if not res:
            cursor.execute(createTableSql)
        mysql_pool.commit()
    except Exception as e:
        print(time_log() +'Error:', e)
        mysql_pool.rollback()
        mysql_pool.close()
    slowLogInsertSql ="insert into %s" % mysqlTable +  "(hostname," \
                                        "ip," \
                                        "port," \
                                        "sys_time," \
                                        "cli_user," \
                                        "cli_ip," \
                                        "`schema`," \
                                        "query_time," \
                                        "lock_time," \
                                        "rows_sent," \
                                        "bytes_sent," \
                                        "`timestamp`," \
                                        "command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    if len(data) > 0:
        for item in data:
            row = (item['hostname'].encode('utf-8'),
                    item['ip'].encode('utf-8'),
                    item['port'],
                    item['sys_time'].encode('utf-8'),
                    item['cli_user'].encode('utf-8'),
                    item['cli_ip'].encode('utf-8'),
                    item['schema'].encode('utf-8'),
                    item['query_time'].encode('utf-8'),
                    item['lock_time'].encode('utf-8'),
                    item['rows_sent'].encode('utf-8'),
                    item['bytes_sent'].encode('utf-8'),
                    item['timestamp'].encode('utf-8'),
                    pymysql.escape_string(item['command']).encode('utf-8'))
            data_list.append(row)
        print(len(data_list))
        # Insert slow log data
        try:
            cursor.executemany(slowLogInsertSql , data_list)
            mysql_pool.commit()
            mysql_pool.close()
        except Exception as e:
            print(time_log() +'Error:',e)
            mysql_pool.rollback()
            mysql_pool.close()
    else:
        print(time_log() + 'No data')
def main():
    try:
        redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
        redisConn= redis.Redis(connection_pool=redis_pool)
    except:
        print(time_log() + 'Error! Can not connect to redis!')
    try:
        mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
    except:
        print(time_log() + 'Error! Can not connect to mysql!')
    print(time_log())
    data = gather_log(redisConn)
    send_data(data,mysql_pool)
    print(time_log())
    # time scheduler
    timeSchedule = collectStep
    global timer
    timer = threading.Timer(timeSchedule, main)
    timer.start()
if __name__ == '__main__':
    timer = threading.Timer(1, main)
    timer.start()
前端使用django展示慢查询数据,同时每周通过将响应的业务慢查询数据发送给开发人员。
mysql错误日志也是同样进行处理。



网页名称:mysql慢查询和错误日志分析
地址分享:http://pcwzsj.com/article/ihoijh.html