feat(admin): 添加积分发放管理和邀请奖励功能 - 新增积分发放相关模型 PointsGrant 和 InviteReward - 实现管理员积分发放接口(单个、批量、全员发放) - 添加积分发放记录查询和统计功能 - 集成邀请奖励机制,在用户充值时自动发放邀请奖励 - 在用户注册流程中集成邀请码功能 - 扩展用户信息返回积分和创建时间字段 - 添加前端邀请码处理和邀请统计功能 ```
698 lines
25 KiB
Python
698 lines
25 KiB
Python
from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for
|
||
import json
|
||
from datetime import timedelta
|
||
from extensions import db
|
||
from models import User, get_bj_now
|
||
from services.sms_service import SMSService
|
||
from services.captcha_service import CaptchaService
|
||
from services.logger import system_logger
|
||
from middlewares.auth import admin_required
|
||
|
||
auth_bp = Blueprint('auth', __name__)
|
||
|
||
@auth_bp.route('/login')
|
||
def login_page():
|
||
"""登录页面"""
|
||
if 'user_id' in session:
|
||
return redirect(url_for('index'))
|
||
return render_template('login.html')
|
||
|
||
@auth_bp.route('/logs')
|
||
@admin_required
|
||
def logs_page():
|
||
"""日志查看页面"""
|
||
return render_template('logs.html')
|
||
|
||
@auth_bp.route('/rbac')
|
||
@admin_required
|
||
def rbac_page():
|
||
"""角色权限管理页面"""
|
||
return render_template('rbac.html')
|
||
|
||
@auth_bp.route('/dicts')
|
||
@admin_required
|
||
def dicts_page():
|
||
"""系统字典管理页面"""
|
||
return render_template('dicts.html')
|
||
|
||
@auth_bp.route('/notifications')
|
||
@admin_required
|
||
def notifications_page():
|
||
"""系统通知管理页面"""
|
||
return render_template('notifications.html')
|
||
|
||
@auth_bp.route('/admin/orders')
|
||
@admin_required
|
||
def admin_orders_page():
|
||
"""全员订单管理页面"""
|
||
return render_template('orders.html')
|
||
|
||
@auth_bp.route('/admin/points')
|
||
@admin_required
|
||
def admin_points_page():
|
||
"""积分发放管理页面"""
|
||
return render_template('points.html')
|
||
|
||
@auth_bp.route('/buy')
|
||
def buy_page():
|
||
"""购买积分页面"""
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('auth.login_page'))
|
||
|
||
from models import Order, User
|
||
user_id = session['user_id']
|
||
user = db.session.get(User, user_id)
|
||
|
||
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
|
||
|
||
# 获取用户个人充值记录 (过滤掉超过 30 分钟未支付的订单)
|
||
personal_orders = Order.query.filter(
|
||
Order.user_id == user_id,
|
||
db.or_(
|
||
Order.status == 'PAID',
|
||
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
|
||
)
|
||
).order_by(Order.created_at.desc()).limit(10).all()
|
||
|
||
# 如果是管理员,获取全员记录
|
||
is_admin = user.has_permission('manage_system')
|
||
admin_orders = []
|
||
if is_admin:
|
||
admin_orders = Order.query.filter(
|
||
db.or_(
|
||
Order.status == 'PAID',
|
||
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
|
||
)
|
||
).order_by(Order.created_at.desc()).limit(10).all()
|
||
|
||
# 处理支付成功提示
|
||
success = request.args.get('success') == 'true'
|
||
out_trade_no = request.args.get('out_trade_no')
|
||
order = None
|
||
if out_trade_no:
|
||
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
|
||
|
||
import datetime as dt_module
|
||
return render_template('buy.html',
|
||
user=user,
|
||
personal_orders=personal_orders,
|
||
admin_orders=admin_orders,
|
||
is_admin=is_admin,
|
||
modules={'datetime': dt_module},
|
||
success=success,
|
||
order=order)
|
||
|
||
@auth_bp.route('/recharge_history')
|
||
def recharge_history_page():
|
||
"""充值历史记录页面"""
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('auth.login_page'))
|
||
|
||
from models import Order, User
|
||
user_id = session['user_id']
|
||
user = db.session.get(User, user_id)
|
||
|
||
# 获取用户所有充值记录
|
||
orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).all()
|
||
|
||
return render_template('recharge_history.html',
|
||
user=user,
|
||
orders=orders)
|
||
|
||
@auth_bp.route('/api/auth/captcha')
|
||
def get_captcha():
|
||
"""获取图形验证码并存入 Redis"""
|
||
phone = request.args.get('phone')
|
||
if not phone:
|
||
return jsonify({"error": "缺少参数"}), 400
|
||
|
||
text, img_bytes = CaptchaService.generate_captcha()
|
||
|
||
from extensions import redis_client
|
||
# 存入 Redis,有效期 5 分钟
|
||
redis_client.setex(f"captcha:{phone}", 300, text.lower())
|
||
|
||
from flask import Response
|
||
return Response(img_bytes, mimetype='image/png')
|
||
|
||
@auth_bp.route('/api/auth/send_code', methods=['POST'])
|
||
def send_code():
|
||
data = request.json
|
||
phone = data.get('phone')
|
||
captcha = data.get('captcha')
|
||
ip = request.remote_addr
|
||
|
||
if not phone:
|
||
return jsonify({"error": "请输入手机号"}), 400
|
||
|
||
import re
|
||
if not re.match(r'^1[3-9]\d{9}$', phone):
|
||
return jsonify({"error": "手机号格式不正确"}), 400
|
||
|
||
if not captcha:
|
||
return jsonify({"error": "请输入图形验证码", "show_captcha": True}), 403
|
||
|
||
from extensions import redis_client
|
||
|
||
# 1. 验证图形验证码
|
||
saved_captcha = redis_client.get(f"captcha:{phone}")
|
||
if not saved_captcha or captcha.lower() != saved_captcha.decode('utf-8'):
|
||
return jsonify({"error": "图形验证码错误或已过期", "refresh_captcha": True}), 403
|
||
|
||
# 验证后立即删除,防止被脚本重复利用来刷短信
|
||
redis_client.delete(f"captcha:{phone}")
|
||
|
||
# 2. 频率限制:单手机号 60秒 一次 (后端兜底)
|
||
if redis_client.get(f"sms_lock:{phone}"):
|
||
return jsonify({"error": "发送过于频繁,请稍后再试"}), 429
|
||
|
||
# 3. 每日限制:单手机号每天最多 10 条
|
||
day_count_key = f"sms_day_count:{phone}"
|
||
day_count = int(redis_client.get(day_count_key) or 0)
|
||
if day_count >= 10:
|
||
return jsonify({"error": "该手机号今日获取验证码次数已达上限"}), 429
|
||
|
||
# 4. 每日限制:单 IP 每天最多 20 条 (防止换号刷)
|
||
ip_count_key = f"sms_ip_count:{ip}"
|
||
ip_count = int(redis_client.get(ip_count_key) or 0)
|
||
if ip_count >= 20:
|
||
return jsonify({"error": "您的设备今日发送请求过多,请明天再试"}), 429
|
||
|
||
system_logger.info(f"用户请求发送验证码", phone=phone, ip=ip)
|
||
success, msg = SMSService.send_code(phone)
|
||
if success:
|
||
# 设置各种限制标记
|
||
now = get_bj_now()
|
||
seconds_until_midnight = ((23 - now.hour) * 3600) + ((59 - now.minute) * 60) + (60 - now.second)
|
||
|
||
redis_client.setex(f"sms_lock:{phone}", 60, "1")
|
||
redis_client.setex(day_count_key, seconds_until_midnight, day_count + 1)
|
||
redis_client.setex(ip_count_key, seconds_until_midnight, ip_count + 1)
|
||
|
||
system_logger.info(f"验证码发送成功", phone=phone)
|
||
return jsonify({"message": "验证码已发送"})
|
||
|
||
system_logger.warning(f"验证码发送失败: {msg}", phone=phone)
|
||
return jsonify({"error": f"发送失败: {msg}"}), 500
|
||
|
||
@auth_bp.route('/api/auth/register', methods=['POST'])
|
||
def register():
|
||
data = request.json
|
||
phone = data.get('phone')
|
||
code = data.get('code')
|
||
password = data.get('password')
|
||
invite_code = data.get('invite_code', '').strip().upper() # 获取邀请码
|
||
|
||
import re
|
||
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
|
||
return jsonify({"error": "手机号格式不正确"}), 400
|
||
|
||
system_logger.info(f"用户注册请求", phone=phone)
|
||
|
||
if not SMSService.verify_code(phone, code):
|
||
system_logger.warning(f"注册失败: 验证码错误", phone=phone)
|
||
return jsonify({"error": "验证码错误或已过期"}), 400
|
||
|
||
if User.query.filter_by(phone=phone).first():
|
||
system_logger.warning(f"注册失败: 手机号已存在", phone=phone)
|
||
return jsonify({"error": "该手机号已注册"}), 400
|
||
|
||
# 验证邀请码(如果提供)
|
||
inviter = None
|
||
if invite_code:
|
||
inviter = User.query.filter_by(invite_code=invite_code).first()
|
||
if not inviter:
|
||
return jsonify({"error": "邀请码无效,请检查后重试"}), 400
|
||
|
||
# 获取默认角色:普通用户
|
||
from models import Role
|
||
user_role = Role.query.filter_by(name='普通用户').first()
|
||
|
||
# 生成唯一邀请码
|
||
import random
|
||
import string
|
||
def generate_invite_code():
|
||
while True:
|
||
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
|
||
if not User.query.filter_by(invite_code=code).first():
|
||
return code
|
||
|
||
user = User(phone=phone, role=user_role, invite_code=generate_invite_code())
|
||
if inviter:
|
||
user.invited_by = inviter.id
|
||
user.set_password(password)
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
|
||
if inviter:
|
||
system_logger.info(f"用户注册成功(通过邀请码)", phone=phone, user_id=user.id, inviter_id=inviter.id)
|
||
else:
|
||
system_logger.info(f"用户注册成功", phone=phone, user_id=user.id)
|
||
return jsonify({"message": "注册成功"})
|
||
|
||
@auth_bp.route('/api/auth/login', methods=['POST'])
|
||
def login():
|
||
data = request.json
|
||
phone = data.get('phone')
|
||
password = data.get('password')
|
||
code = data.get('code') # 可能是高频报错后强制要求的验证码
|
||
|
||
if not phone or not password:
|
||
return jsonify({"error": "请输入手机号和密码"}), 400
|
||
|
||
from extensions import redis_client
|
||
fail_key = f"login_fail_count:{phone}"
|
||
fail_count = int(redis_client.get(fail_key) or 0)
|
||
|
||
# 如果失败次数过多,强制要求图型验证码
|
||
if fail_count >= 3:
|
||
if not code:
|
||
system_logger.warning(f"触发强制安全验证", phone=phone)
|
||
return jsonify({
|
||
"error": "由于密码错误次数过多,请输入图形验证码",
|
||
"require_captcha": True
|
||
}), 403
|
||
|
||
# 验证图形验证码
|
||
saved_captcha = redis_client.get(f"captcha:{phone}")
|
||
if not saved_captcha or code.lower() != saved_captcha.decode('utf-8'):
|
||
return jsonify({"error": "验证码错误或已过期"}), 400
|
||
|
||
# 验证成功后删除,防止重复使用
|
||
redis_client.delete(f"captcha:{phone}")
|
||
|
||
system_logger.info(f"用户登录尝试", phone=phone)
|
||
|
||
user = User.query.filter_by(phone=phone).first()
|
||
if user and user.check_password(password):
|
||
if user.is_banned:
|
||
system_logger.warning(f"被封禁用户尝试登录", phone=phone)
|
||
return jsonify({"error": "您的账号已被封禁,请联系管理员"}), 403
|
||
|
||
# 登录成功,清除失败计数
|
||
redis_client.delete(fail_key)
|
||
|
||
session.permanent = True
|
||
session['user_id'] = user.id
|
||
system_logger.info(f"用户登录成功", phone=phone, user_id=user.id)
|
||
return jsonify({"message": "登录成功", "phone": phone})
|
||
|
||
# 登录失败,增加计数 (有效期 1 小时)
|
||
redis_client.setex(fail_key, 3600, fail_count + 1)
|
||
|
||
system_logger.warning(f"登录失败: 手机号或密码错误 [次数: {fail_count+1}]", phone=phone)
|
||
return jsonify({"error": "手机号或密码错误"}), 401
|
||
|
||
@auth_bp.route('/api/auth/reset_password', methods=['POST'])
|
||
def reset_password():
|
||
"""通过短信重置密码"""
|
||
data = request.json
|
||
phone = data.get('phone')
|
||
code = data.get('code')
|
||
new_password = data.get('password')
|
||
|
||
import re
|
||
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
|
||
return jsonify({"error": "手机号格式不正确"}), 400
|
||
|
||
if not phone or not code or not new_password:
|
||
return jsonify({"error": "请填写完整信息"}), 400
|
||
|
||
if not SMSService.verify_code(phone, code):
|
||
return jsonify({"error": "验证码错误或已过期"}), 400
|
||
|
||
user = User.query.filter_by(phone=phone).first()
|
||
if not user:
|
||
return jsonify({"error": "该手机号尚未注册"}), 404
|
||
|
||
user.set_password(new_password)
|
||
db.session.commit()
|
||
|
||
# 重置成功后清理失败计数
|
||
from extensions import redis_client
|
||
redis_client.delete(f"login_fail_count:{phone}")
|
||
|
||
system_logger.info(f"用户通过短信重置密码成功", phone=phone, user_id=user.id)
|
||
return jsonify({"message": "密码重置成功,请使用新密码登录"})
|
||
|
||
@auth_bp.route('/api/auth/logout', methods=['POST'])
|
||
def logout():
|
||
session.pop('user_id', None)
|
||
return jsonify({"message": "已退出登录"})
|
||
|
||
@auth_bp.route('/api/auth/me', methods=['GET'])
|
||
def me():
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"logged_in": False})
|
||
user = db.session.get(User, user_id)
|
||
if not user:
|
||
session.pop('user_id', None)
|
||
return jsonify({"logged_in": False})
|
||
|
||
# 脱敏手机号: 13812345678 -> 138****5678
|
||
phone = user.phone
|
||
masked_phone = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone
|
||
|
||
return jsonify({
|
||
"logged_in": True,
|
||
"phone": masked_phone, # 默认返回脱敏的供前端显示
|
||
"full_phone": phone, # 某些场景可能需要完整号
|
||
"api_key": user.api_key,
|
||
"points": user.points,
|
||
"hide_custom_key": (not user.api_key) or user.has_used_points, # Key为空或使用过积分,则隐藏自定义Key入口
|
||
"invite_code": user.invite_code # 用户邀请码
|
||
})
|
||
|
||
@auth_bp.route('/api/auth/invite_stats', methods=['GET'])
|
||
def get_my_invite_stats():
|
||
"""获取当前用户的邀请统计"""
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"error": "请先登录"}), 401
|
||
|
||
user = db.session.get(User, user_id)
|
||
if not user:
|
||
return jsonify({"error": "用户不存在"}), 404
|
||
|
||
from models import InviteReward, Order
|
||
|
||
# 我邀请的用户数
|
||
invited_count = User.query.filter_by(invited_by=user_id).count()
|
||
|
||
# 我获得的邀请奖励总积分
|
||
total_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter(
|
||
InviteReward.inviter_id == user_id
|
||
).scalar() or 0
|
||
|
||
# 我邀请的用户列表(脱敏)
|
||
invited_users = User.query.filter_by(invited_by=user_id).order_by(User.created_at.desc()).limit(20).all()
|
||
|
||
invited_list = []
|
||
for u in invited_users:
|
||
# 脱敏手机号
|
||
masked = u.phone[:3] + "****" + u.phone[-4:] if len(u.phone) >= 11 else u.phone
|
||
# 计算该用户给我带来的奖励
|
||
user_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter(
|
||
InviteReward.inviter_id == user_id,
|
||
InviteReward.invitee_id == u.id
|
||
).scalar() or 0
|
||
|
||
invited_list.append({
|
||
"phone": masked,
|
||
"registered_at": u.created_at_bj.strftime('%Y-%m-%d') if u.created_at else None,
|
||
"rewards_earned": int(user_rewards)
|
||
})
|
||
|
||
return jsonify({
|
||
"invite_code": user.invite_code,
|
||
"invited_count": invited_count,
|
||
"total_rewards": int(total_rewards),
|
||
"invited_users": invited_list
|
||
})
|
||
|
||
@auth_bp.route('/api/auth/point_history', methods=['GET'])
|
||
def get_point_history():
|
||
"""获取当前用户的积分变动历史记录"""
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"error": "请先登录"}), 401
|
||
|
||
from models import Order, PointsGrant, InviteReward, User
|
||
|
||
history = []
|
||
|
||
# 1. 充值记录
|
||
orders = Order.query.filter_by(user_id=user_id, status='PAID').all()
|
||
for o in orders:
|
||
history.append({
|
||
"type": "recharge",
|
||
"type_label": "账户充值",
|
||
"amount": f"+{o.points}",
|
||
"desc": f"在线支付 ¥{o.amount}",
|
||
"time": o.paid_at_bj.strftime('%Y-%m-%d %H:%M') if o.paid_at_bj else o.created_at_bj.strftime('%Y-%m-%d %H:%M')
|
||
})
|
||
|
||
# 2. 手动发放记录
|
||
grants = PointsGrant.query.filter_by(user_id=user_id).all()
|
||
for g in grants:
|
||
history.append({
|
||
"type": "grant",
|
||
"type_label": "系统发放",
|
||
"amount": f"+{g.points}",
|
||
"desc": g.reason or "管理员手动调账",
|
||
"time": g.created_at_bj.strftime('%Y-%m-%d %H:%M')
|
||
})
|
||
|
||
# 3. 邀请奖励记录
|
||
rewards = InviteReward.query.filter_by(inviter_id=user_id).all()
|
||
for r in rewards:
|
||
# 查找被邀请人信息
|
||
invitee = db.session.get(User, r.invitee_id)
|
||
phone = invitee.phone if invitee else "未知用户"
|
||
masked = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone
|
||
|
||
history.append({
|
||
"type": "reward",
|
||
"type_label": "邀请奖励",
|
||
"amount": f"+{r.reward_points}",
|
||
"desc": f"好友({masked})充值返利",
|
||
"time": r.created_at_bj.strftime('%Y-%m-%d %H:%M')
|
||
})
|
||
|
||
# 4. 消费记录 (GenerationRecord)
|
||
from models import GenerationRecord
|
||
consumptions = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).limit(100).all()
|
||
for c in consumptions:
|
||
# 截断 prompt
|
||
action = c.prompt[:20] + '...' if c.prompt and len(c.prompt) > 20 else (c.prompt or "AI生成")
|
||
history.append({
|
||
"type": "consumption",
|
||
"type_label": "积分消费",
|
||
"amount": f"-{c.cost}",
|
||
"desc": f"[{c.model or 'Default'}] {action}",
|
||
"time": c.created_at_bj.strftime('%Y-%m-%d %H:%M')
|
||
})
|
||
|
||
# 按时间降序排序
|
||
history.sort(key=lambda x: x['time'], reverse=True)
|
||
|
||
# 手动分页
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 5, type=int)
|
||
total = len(history)
|
||
start = (page - 1) * per_page
|
||
end = start + per_page
|
||
|
||
import math
|
||
return jsonify({
|
||
"details": history[start:end],
|
||
"total": total,
|
||
"pages": math.ceil(total / per_page),
|
||
"current_page": page
|
||
})
|
||
|
||
@auth_bp.route('/api/auth/point_stats', methods=['GET'])
|
||
def get_point_stats():
|
||
"""获取用户过去 7 天的积分变动统计(充值 vs 消费)"""
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"error": "请先登录"}), 401
|
||
|
||
from models import Order, GenerationRecord, get_bj_now
|
||
from datetime import timedelta
|
||
|
||
end_date = get_bj_now().date()
|
||
start_date = end_date - timedelta(days=6)
|
||
|
||
labels = []
|
||
consumption_data = []
|
||
recharge_data = []
|
||
|
||
curr = start_date
|
||
while curr <= end_date:
|
||
labels.append(curr.strftime('%m-%d'))
|
||
|
||
# 当天充值 (PAID 订单)
|
||
day_recharge = db.session.query(db.func.sum(Order.points)).filter(
|
||
Order.user_id == user_id,
|
||
Order.status == 'PAID',
|
||
db.func.date(Order.paid_at) == curr
|
||
).scalar() or 0
|
||
|
||
# 当天消费 (生成记录)
|
||
day_consumption = db.session.query(db.func.sum(GenerationRecord.cost)).filter(
|
||
GenerationRecord.user_id == user_id,
|
||
db.func.date(GenerationRecord.created_at) == curr
|
||
).scalar() or 0
|
||
|
||
recharge_data.append(int(day_recharge))
|
||
consumption_data.append(int(day_consumption))
|
||
|
||
curr += timedelta(days=1)
|
||
|
||
return jsonify({
|
||
"labels": labels,
|
||
"consumption": consumption_data,
|
||
"recharge": recharge_data
|
||
})
|
||
|
||
@auth_bp.route('/api/auth/consumption_details', methods=['GET'])
|
||
def get_consumption_details():
|
||
"""获取用户积分消费明细记录"""
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"error": "请先登录"}), 401
|
||
|
||
from models import GenerationRecord
|
||
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 10, type=int)
|
||
|
||
pagination = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).paginate(
|
||
page=page, per_page=per_page, error_out=False
|
||
)
|
||
|
||
details = []
|
||
for r in pagination.items:
|
||
# 截断 prompt 作为动作描述
|
||
action = r.prompt[:30] + '...' if r.prompt and len(r.prompt) > 30 else (r.prompt or "生成图片")
|
||
details.append({
|
||
"id": r.id,
|
||
"action": action,
|
||
"model": r.model or "Default",
|
||
"cost": f"-{r.cost}",
|
||
"time": r.created_at_bj.strftime('%Y-%m-%d %H:%M')
|
||
})
|
||
|
||
return jsonify({
|
||
"details": details,
|
||
"total": pagination.total,
|
||
"pages": pagination.pages,
|
||
"current_page": pagination.page
|
||
})
|
||
|
||
@auth_bp.route('/api/auth/change_password', methods=['POST'])
|
||
def change_password():
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"error": "请先登录"}), 401
|
||
|
||
data = request.json
|
||
old_password = data.get('old_password')
|
||
new_password = data.get('new_password')
|
||
|
||
if not old_password or not new_password:
|
||
return jsonify({"error": "请填写完整信息"}), 400
|
||
|
||
user = db.session.get(User, user_id)
|
||
if not user.check_password(old_password):
|
||
return jsonify({"error": "原密码错误"}), 400
|
||
|
||
user.set_password(new_password)
|
||
db.session.commit()
|
||
|
||
system_logger.info(f"用户修改密码成功", phone=user.phone, user_id=user.id)
|
||
return jsonify({"message": "密码修改成功"})
|
||
|
||
@auth_bp.route('/api/auth/add_points', methods=['POST'])
|
||
def add_points():
|
||
"""手动充值积分接口 (暂时禁用,等待正式接入)"""
|
||
return jsonify({"error": "充值功能维护中,敬请期待"}), 503
|
||
|
||
# 原逻辑备份
|
||
# user_id = session.get('user_id')
|
||
# if not user_id:
|
||
# return jsonify({"error": "请先登录"}), 401
|
||
# ... (原有逻辑)
|
||
|
||
def get_user_menu(user):
|
||
"""根据用户权限生成菜单列表"""
|
||
if not user:
|
||
return []
|
||
|
||
all_menus = [
|
||
{"name": "创作工作台", "icon": "wand-2", "url": "/", "perm": None},
|
||
{"name": "AI 视频创作", "icon": "video", "url": "/video", "perm": None},
|
||
{"name": "验光单助手", "icon": "scan-eye", "url": "/ocr", "perm": None},
|
||
{"name": "购买积分", "icon": "shopping-cart", "url": "/buy", "perm": None},
|
||
{"name": "积分发放管理", "icon": "gift", "url": "/admin/points", "perm": "manage_system"},
|
||
{"name": "权限管理中心", "icon": "shield-check", "url": "/rbac", "perm": "manage_rbac"},
|
||
{"name": "系统字典管理", "icon": "book-open", "url": "/dicts", "perm": "manage_dicts"},
|
||
{"name": "系统通知管理", "icon": "megaphone", "url": "/notifications", "perm": "manage_notifications"},
|
||
{"name": "系统日志审计", "icon": "terminal", "url": "/logs", "perm": "view_logs"},
|
||
]
|
||
|
||
accessible_menu = []
|
||
for item in all_menus:
|
||
if item["perm"] is None or user.has_permission(item["perm"]):
|
||
accessible_menu.append(item)
|
||
return accessible_menu
|
||
|
||
@auth_bp.route('/api/auth/menu', methods=['GET'])
|
||
def get_menu():
|
||
"""获取动态导航菜单"""
|
||
user_id = session.get('user_id')
|
||
if not user_id:
|
||
return jsonify({"menu": []})
|
||
|
||
user = db.session.get(User, user_id)
|
||
return jsonify({"menu": get_user_menu(user)})
|
||
|
||
@auth_bp.route('/api/auth/logs', methods=['GET'])
|
||
@admin_required
|
||
def get_logs():
|
||
"""获取系统日志(支持搜索、筛选与分页)"""
|
||
from models import SystemLog, User
|
||
|
||
# 分页与筛选参数
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 20, type=int)
|
||
level_filter = request.args.get('level')
|
||
search_query = request.args.get('search', '').strip()
|
||
|
||
query = db.session.query(SystemLog).outerjoin(User)
|
||
|
||
# 级别过滤
|
||
if level_filter:
|
||
query = query.filter(SystemLog.level == level_filter)
|
||
|
||
# 关键词搜索 (支持消息、手机号、IP)
|
||
if search_query:
|
||
search_filter = db.or_(
|
||
SystemLog.message.ilike(f"%{search_query}%"),
|
||
SystemLog.ip.ilike(f"%{search_query}%"),
|
||
User.phone.ilike(f"%{search_query}%"),
|
||
SystemLog.module.ilike(f"%{search_query}%")
|
||
)
|
||
query = query.filter(search_filter)
|
||
|
||
# 执行分页查询
|
||
pagination = query.order_by(SystemLog.created_at.desc()).paginate(
|
||
page=page, per_page=per_page, error_out=False
|
||
)
|
||
|
||
logs_data = []
|
||
for log in pagination.items:
|
||
logs_data.append({
|
||
"id": log.id,
|
||
"time": log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
|
||
"level": log.level,
|
||
"message": log.message,
|
||
"module": log.module,
|
||
"user_id": log.user_id,
|
||
"user_phone": log.user.phone if log.user else "系统/游客",
|
||
"ip": log.ip,
|
||
"path": log.path,
|
||
"method": log.method,
|
||
"extra": json.loads(log.extra) if log.extra else {}
|
||
})
|
||
|
||
return jsonify({
|
||
"logs": logs_data,
|
||
"total": pagination.total,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"total_pages": pagination.pages
|
||
})
|