ai_v/services/logger.py

139 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
from datetime import datetime, timedelta, timezone
from logging.handlers import RotatingFileHandler
from extensions import redis_client, db
import json
from flask import request, has_request_context, g
# 创建日志目录
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
os.makedirs(LOG_DIR, exist_ok=True)
# 北京时区 (UTC+8)
BEIJING_TZ = timezone(timedelta(hours=8))
class BeijingTimeFormatter(logging.Formatter):
"""自定义 Formatter确保日志时间始终使用北京时间UTC+8"""
def formatTime(self, record, datefmt=None):
# 将 record.created (UTC 时间戳) 转换为北京时间
ct = datetime.fromtimestamp(record.created, tz=BEIJING_TZ)
if datefmt:
s = ct.strftime(datefmt)
else:
s = ct.strftime('%Y-%m-%d %H:%M:%S')
return s
class SystemLogger:
def __init__(self):
self.logger = logging.getLogger('vision_ai')
self.logger.setLevel(logging.INFO)
# 文件处理器 (自动轮转最大10MB保留5个备份)
file_handler = RotatingFileHandler(
os.path.join(LOG_DIR, 'system.log'),
maxBytes=10*1024*1024,
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化 - 使用自定义的北京时间格式化器
formatter = BeijingTimeFormatter(
'[%(asctime)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _push_to_redis(self, level, message, extra=None):
"""推送到 Redis 并保留 30 天数据"""
try:
now = datetime.utcnow()
bj_now = now + timedelta(hours=8)
user_id = None
if has_request_context():
user_id = g.get('user_id') or (getattr(g, 'user', None).id if hasattr(g, 'user') and g.user else None)
log_entry = {
"time": bj_now.strftime('%Y-%m-%d %H:%M:%S'),
"level": level,
"message": message,
"user_id": user_id,
"extra": extra or {}
}
# 使用有序集合 (ZSET),分数为时间戳,方便按时间清理
timestamp = now.timestamp()
redis_client.zadd('system_logs_zset', {json.dumps(log_entry, ensure_ascii=False): timestamp})
# 清理 30 天之前的日志 (30 * 24 * 3600 秒)
thirty_days_ago = timestamp - (30 * 24 * 3600)
redis_client.zremrangebyscore('system_logs_zset', 0, thirty_days_ago)
except:
pass
def _write_to_db(self, level, message, module=None, extra=None):
"""写入数据库"""
try:
from models import SystemLog
log_data = {
'level': level,
'message': message,
'module': module,
'user_id': extra.get('user_id') if extra else None,
'extra': json.dumps(extra, ensure_ascii=False) if extra else None,
'created_at': datetime.utcnow()
}
# 捕获请求上下文信息
if has_request_context():
log_data.update({
'ip': request.remote_addr,
'path': request.path,
'method': request.method,
'user_agent': request.headers.get('User-Agent')
})
# 如果 log_data 还没 user_id尝试从 context 获取
if not log_data['user_id']:
log_data['user_id'] = getattr(g, 'user_id', None) or (getattr(g.user, 'id', None) if hasattr(g, 'user') and g.user else None)
new_log = SystemLog(**log_data)
db.session.add(new_log)
db.session.commit()
except Exception as e:
# 避免日志错误导致程序崩溃
self.logger.error(f"Failed to write log to DB: {str(e)}")
def info(self, message, module=None, **kwargs):
self.logger.info(message)
self._push_to_redis('INFO', message, kwargs)
self._write_to_db('INFO', message, module, kwargs)
def warning(self, message, module=None, **kwargs):
self.logger.warning(message)
self._push_to_redis('WARNING', message, kwargs)
self._write_to_db('WARNING', message, module, kwargs)
def error(self, message, module=None, **kwargs):
self.logger.error(message)
self._push_to_redis('ERROR', message, kwargs)
self._write_to_db('ERROR', message, module, kwargs)
def debug(self, message, module=None, **kwargs):
self.logger.debug(message)
self._push_to_redis('DEBUG', message, kwargs)
self._write_to_db('DEBUG', message, module, kwargs)
# 全局日志实例
system_logger = SystemLogger()