import logging import os from datetime import datetime from logging.handlers import RotatingFileHandler from extensions import redis_client import json # 创建日志目录 LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs') os.makedirs(LOG_DIR, exist_ok=True) class SystemLogger: def __init__(self): self.logger = logging.getLogger('vision_ai') self.logger.setLevel(logging.INFO) # 文件处理器 (自动轮转,最大10MB,保留5个备份) file_handler = RotatingFileHandler( os.path.join(LOG_DIR, 'system.log'), maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化 formatter = logging.Formatter( '[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def _push_to_redis(self, level, message, extra=None): """推送到 Redis 并保留 30 天数据""" try: now = datetime.now() log_entry = { "time": now.strftime('%Y-%m-%d %H:%M:%S'), "level": level, "message": message, "extra": extra or {} } # 使用有序集合 (ZSET),分数为时间戳,方便按时间清理 timestamp = now.timestamp() redis_client.zadd('system_logs_zset', {json.dumps(log_entry, ensure_ascii=False): timestamp}) # 清理 30 天之前的日志 (30 * 24 * 3600 秒) thirty_days_ago = timestamp - (30 * 24 * 3600) redis_client.zremrangebyscore('system_logs_zset', 0, thirty_days_ago) except: pass def info(self, message, **kwargs): self.logger.info(message) self._push_to_redis('INFO', message, kwargs) def warning(self, message, **kwargs): self.logger.warning(message) self._push_to_redis('WARNING', message, kwargs) def error(self, message, **kwargs): self.logger.error(message) self._push_to_redis('ERROR', message, kwargs) def debug(self, message, **kwargs): self.logger.debug(message) self._push_to_redis('DEBUG', message, kwargs) # 全局日志实例 system_logger = SystemLogger()