ai_v/middlewares/auth.py

46 lines
2.1 KiB
Python

from functools import wraps
from flask import session, jsonify, redirect, url_for, request
from models import User
from services.logger import system_logger
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
system_logger.warning(f"未授权访问尝试 (未登录): {request.path}", ip=request.remote_addr)
if request.path.startswith('/api/'):
return jsonify({"error": "请先登录", "code": 401}), 401
# 记录当前路径以便登录后跳转
return redirect(url_for('auth.login_page', next=request.path))
return f(*args, **kwargs)
return decorated_function
def permission_required(perm_name):
"""动态权限验证装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
system_logger.warning(f"未授权访问尝试 (未登录): {request.path}", ip=request.remote_addr)
if request.path.startswith('/api/'):
return jsonify({"error": "请先登录", "code": 401}), 401
return redirect(url_for('auth.login_page', next=request.path))
user = User.query.get(user_id)
if not user or not user.has_permission(perm_name):
system_logger.warning(f"未授权访问尝试 (权限不足: {perm_name}): {request.path}", user_id=user_id, ip=request.remote_addr)
if request.path.startswith('/api/'):
return jsonify({"error": f"需要权限: {perm_name}", "code": 403}), 403
# 如果没有权限,重定向到首页并提示
return redirect(url_for('index', error="权限不足"))
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
"""兼容旧版的管理员验证(内部调用 manage_system 权限)"""
return permission_required('manage_system')(f)