ai_v/blueprints/auth.py
24024 0da71bc439 ```
feat(admin): 添加积分发放管理和邀请奖励功能

- 新增积分发放相关模型 PointsGrant 和 InviteReward
- 实现管理员积分发放接口(单个、批量、全员发放)
- 添加积分发放记录查询和统计功能
- 集成邀请奖励机制,在用户充值时自动发放邀请奖励
- 在用户注册流程中集成邀请码功能
- 扩展用户信息返回积分和创建时间字段
- 添加前端邀请码处理和邀请统计功能
```
2026-01-23 21:46:08 +08:00

698 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for
import json
from datetime import timedelta
from extensions import db
from models import User, get_bj_now
from services.sms_service import SMSService
from services.captcha_service import CaptchaService
from services.logger import system_logger
from middlewares.auth import admin_required
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login')
def login_page():
"""登录页面"""
if 'user_id' in session:
return redirect(url_for('index'))
return render_template('login.html')
@auth_bp.route('/logs')
@admin_required
def logs_page():
"""日志查看页面"""
return render_template('logs.html')
@auth_bp.route('/rbac')
@admin_required
def rbac_page():
"""角色权限管理页面"""
return render_template('rbac.html')
@auth_bp.route('/dicts')
@admin_required
def dicts_page():
"""系统字典管理页面"""
return render_template('dicts.html')
@auth_bp.route('/notifications')
@admin_required
def notifications_page():
"""系统通知管理页面"""
return render_template('notifications.html')
@auth_bp.route('/admin/orders')
@admin_required
def admin_orders_page():
"""全员订单管理页面"""
return render_template('orders.html')
@auth_bp.route('/admin/points')
@admin_required
def admin_points_page():
"""积分发放管理页面"""
return render_template('points.html')
@auth_bp.route('/buy')
def buy_page():
"""购买积分页面"""
if 'user_id' not in session:
return redirect(url_for('auth.login_page'))
from models import Order, User
user_id = session['user_id']
user = db.session.get(User, user_id)
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
# 获取用户个人充值记录 (过滤掉超过 30 分钟未支付的订单)
personal_orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).limit(10).all()
# 如果是管理员,获取全员记录
is_admin = user.has_permission('manage_system')
admin_orders = []
if is_admin:
admin_orders = Order.query.filter(
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).limit(10).all()
# 处理支付成功提示
success = request.args.get('success') == 'true'
out_trade_no = request.args.get('out_trade_no')
order = None
if out_trade_no:
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
import datetime as dt_module
return render_template('buy.html',
user=user,
personal_orders=personal_orders,
admin_orders=admin_orders,
is_admin=is_admin,
modules={'datetime': dt_module},
success=success,
order=order)
@auth_bp.route('/recharge_history')
def recharge_history_page():
"""充值历史记录页面"""
if 'user_id' not in session:
return redirect(url_for('auth.login_page'))
from models import Order, User
user_id = session['user_id']
user = db.session.get(User, user_id)
# 获取用户所有充值记录
orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).all()
return render_template('recharge_history.html',
user=user,
orders=orders)
@auth_bp.route('/api/auth/captcha')
def get_captcha():
"""获取图形验证码并存入 Redis"""
phone = request.args.get('phone')
if not phone:
return jsonify({"error": "缺少参数"}), 400
text, img_bytes = CaptchaService.generate_captcha()
from extensions import redis_client
# 存入 Redis有效期 5 分钟
redis_client.setex(f"captcha:{phone}", 300, text.lower())
from flask import Response
return Response(img_bytes, mimetype='image/png')
@auth_bp.route('/api/auth/send_code', methods=['POST'])
def send_code():
data = request.json
phone = data.get('phone')
captcha = data.get('captcha')
ip = request.remote_addr
if not phone:
return jsonify({"error": "请输入手机号"}), 400
import re
if not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({"error": "手机号格式不正确"}), 400
if not captcha:
return jsonify({"error": "请输入图形验证码", "show_captcha": True}), 403
from extensions import redis_client
# 1. 验证图形验证码
saved_captcha = redis_client.get(f"captcha:{phone}")
if not saved_captcha or captcha.lower() != saved_captcha.decode('utf-8'):
return jsonify({"error": "图形验证码错误或已过期", "refresh_captcha": True}), 403
# 验证后立即删除,防止被脚本重复利用来刷短信
redis_client.delete(f"captcha:{phone}")
# 2. 频率限制:单手机号 60秒 一次 (后端兜底)
if redis_client.get(f"sms_lock:{phone}"):
return jsonify({"error": "发送过于频繁,请稍后再试"}), 429
# 3. 每日限制:单手机号每天最多 10 条
day_count_key = f"sms_day_count:{phone}"
day_count = int(redis_client.get(day_count_key) or 0)
if day_count >= 10:
return jsonify({"error": "该手机号今日获取验证码次数已达上限"}), 429
# 4. 每日限制:单 IP 每天最多 20 条 (防止换号刷)
ip_count_key = f"sms_ip_count:{ip}"
ip_count = int(redis_client.get(ip_count_key) or 0)
if ip_count >= 20:
return jsonify({"error": "您的设备今日发送请求过多,请明天再试"}), 429
system_logger.info(f"用户请求发送验证码", phone=phone, ip=ip)
success, msg = SMSService.send_code(phone)
if success:
# 设置各种限制标记
now = get_bj_now()
seconds_until_midnight = ((23 - now.hour) * 3600) + ((59 - now.minute) * 60) + (60 - now.second)
redis_client.setex(f"sms_lock:{phone}", 60, "1")
redis_client.setex(day_count_key, seconds_until_midnight, day_count + 1)
redis_client.setex(ip_count_key, seconds_until_midnight, ip_count + 1)
system_logger.info(f"验证码发送成功", phone=phone)
return jsonify({"message": "验证码已发送"})
system_logger.warning(f"验证码发送失败: {msg}", phone=phone)
return jsonify({"error": f"发送失败: {msg}"}), 500
@auth_bp.route('/api/auth/register', methods=['POST'])
def register():
data = request.json
phone = data.get('phone')
code = data.get('code')
password = data.get('password')
invite_code = data.get('invite_code', '').strip().upper() # 获取邀请码
import re
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({"error": "手机号格式不正确"}), 400
system_logger.info(f"用户注册请求", phone=phone)
if not SMSService.verify_code(phone, code):
system_logger.warning(f"注册失败: 验证码错误", phone=phone)
return jsonify({"error": "验证码错误或已过期"}), 400
if User.query.filter_by(phone=phone).first():
system_logger.warning(f"注册失败: 手机号已存在", phone=phone)
return jsonify({"error": "该手机号已注册"}), 400
# 验证邀请码(如果提供)
inviter = None
if invite_code:
inviter = User.query.filter_by(invite_code=invite_code).first()
if not inviter:
return jsonify({"error": "邀请码无效,请检查后重试"}), 400
# 获取默认角色:普通用户
from models import Role
user_role = Role.query.filter_by(name='普通用户').first()
# 生成唯一邀请码
import random
import string
def generate_invite_code():
while True:
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
if not User.query.filter_by(invite_code=code).first():
return code
user = User(phone=phone, role=user_role, invite_code=generate_invite_code())
if inviter:
user.invited_by = inviter.id
user.set_password(password)
db.session.add(user)
db.session.commit()
if inviter:
system_logger.info(f"用户注册成功(通过邀请码)", phone=phone, user_id=user.id, inviter_id=inviter.id)
else:
system_logger.info(f"用户注册成功", phone=phone, user_id=user.id)
return jsonify({"message": "注册成功"})
@auth_bp.route('/api/auth/login', methods=['POST'])
def login():
data = request.json
phone = data.get('phone')
password = data.get('password')
code = data.get('code') # 可能是高频报错后强制要求的验证码
if not phone or not password:
return jsonify({"error": "请输入手机号和密码"}), 400
from extensions import redis_client
fail_key = f"login_fail_count:{phone}"
fail_count = int(redis_client.get(fail_key) or 0)
# 如果失败次数过多,强制要求图型验证码
if fail_count >= 3:
if not code:
system_logger.warning(f"触发强制安全验证", phone=phone)
return jsonify({
"error": "由于密码错误次数过多,请输入图形验证码",
"require_captcha": True
}), 403
# 验证图形验证码
saved_captcha = redis_client.get(f"captcha:{phone}")
if not saved_captcha or code.lower() != saved_captcha.decode('utf-8'):
return jsonify({"error": "验证码错误或已过期"}), 400
# 验证成功后删除,防止重复使用
redis_client.delete(f"captcha:{phone}")
system_logger.info(f"用户登录尝试", phone=phone)
user = User.query.filter_by(phone=phone).first()
if user and user.check_password(password):
if user.is_banned:
system_logger.warning(f"被封禁用户尝试登录", phone=phone)
return jsonify({"error": "您的账号已被封禁,请联系管理员"}), 403
# 登录成功,清除失败计数
redis_client.delete(fail_key)
session.permanent = True
session['user_id'] = user.id
system_logger.info(f"用户登录成功", phone=phone, user_id=user.id)
return jsonify({"message": "登录成功", "phone": phone})
# 登录失败,增加计数 (有效期 1 小时)
redis_client.setex(fail_key, 3600, fail_count + 1)
system_logger.warning(f"登录失败: 手机号或密码错误 [次数: {fail_count+1}]", phone=phone)
return jsonify({"error": "手机号或密码错误"}), 401
@auth_bp.route('/api/auth/reset_password', methods=['POST'])
def reset_password():
"""通过短信重置密码"""
data = request.json
phone = data.get('phone')
code = data.get('code')
new_password = data.get('password')
import re
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({"error": "手机号格式不正确"}), 400
if not phone or not code or not new_password:
return jsonify({"error": "请填写完整信息"}), 400
if not SMSService.verify_code(phone, code):
return jsonify({"error": "验证码错误或已过期"}), 400
user = User.query.filter_by(phone=phone).first()
if not user:
return jsonify({"error": "该手机号尚未注册"}), 404
user.set_password(new_password)
db.session.commit()
# 重置成功后清理失败计数
from extensions import redis_client
redis_client.delete(f"login_fail_count:{phone}")
system_logger.info(f"用户通过短信重置密码成功", phone=phone, user_id=user.id)
return jsonify({"message": "密码重置成功,请使用新密码登录"})
@auth_bp.route('/api/auth/logout', methods=['POST'])
def logout():
session.pop('user_id', None)
return jsonify({"message": "已退出登录"})
@auth_bp.route('/api/auth/me', methods=['GET'])
def me():
user_id = session.get('user_id')
if not user_id:
return jsonify({"logged_in": False})
user = db.session.get(User, user_id)
if not user:
session.pop('user_id', None)
return jsonify({"logged_in": False})
# 脱敏手机号: 13812345678 -> 138****5678
phone = user.phone
masked_phone = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone
return jsonify({
"logged_in": True,
"phone": masked_phone, # 默认返回脱敏的供前端显示
"full_phone": phone, # 某些场景可能需要完整号
"api_key": user.api_key,
"points": user.points,
"hide_custom_key": (not user.api_key) or user.has_used_points, # Key为空或使用过积分则隐藏自定义Key入口
"invite_code": user.invite_code # 用户邀请码
})
@auth_bp.route('/api/auth/invite_stats', methods=['GET'])
def get_my_invite_stats():
"""获取当前用户的邀请统计"""
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "请先登录"}), 401
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "用户不存在"}), 404
from models import InviteReward, Order
# 我邀请的用户数
invited_count = User.query.filter_by(invited_by=user_id).count()
# 我获得的邀请奖励总积分
total_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter(
InviteReward.inviter_id == user_id
).scalar() or 0
# 我邀请的用户列表(脱敏)
invited_users = User.query.filter_by(invited_by=user_id).order_by(User.created_at.desc()).limit(20).all()
invited_list = []
for u in invited_users:
# 脱敏手机号
masked = u.phone[:3] + "****" + u.phone[-4:] if len(u.phone) >= 11 else u.phone
# 计算该用户给我带来的奖励
user_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter(
InviteReward.inviter_id == user_id,
InviteReward.invitee_id == u.id
).scalar() or 0
invited_list.append({
"phone": masked,
"registered_at": u.created_at_bj.strftime('%Y-%m-%d') if u.created_at else None,
"rewards_earned": int(user_rewards)
})
return jsonify({
"invite_code": user.invite_code,
"invited_count": invited_count,
"total_rewards": int(total_rewards),
"invited_users": invited_list
})
@auth_bp.route('/api/auth/point_history', methods=['GET'])
def get_point_history():
"""获取当前用户的积分变动历史记录"""
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "请先登录"}), 401
from models import Order, PointsGrant, InviteReward, User
history = []
# 1. 充值记录
orders = Order.query.filter_by(user_id=user_id, status='PAID').all()
for o in orders:
history.append({
"type": "recharge",
"type_label": "账户充值",
"amount": f"+{o.points}",
"desc": f"在线支付 ¥{o.amount}",
"time": o.paid_at_bj.strftime('%Y-%m-%d %H:%M') if o.paid_at_bj else o.created_at_bj.strftime('%Y-%m-%d %H:%M')
})
# 2. 手动发放记录
grants = PointsGrant.query.filter_by(user_id=user_id).all()
for g in grants:
history.append({
"type": "grant",
"type_label": "系统发放",
"amount": f"+{g.points}",
"desc": g.reason or "管理员手动调账",
"time": g.created_at_bj.strftime('%Y-%m-%d %H:%M')
})
# 3. 邀请奖励记录
rewards = InviteReward.query.filter_by(inviter_id=user_id).all()
for r in rewards:
# 查找被邀请人信息
invitee = db.session.get(User, r.invitee_id)
phone = invitee.phone if invitee else "未知用户"
masked = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone
history.append({
"type": "reward",
"type_label": "邀请奖励",
"amount": f"+{r.reward_points}",
"desc": f"好友({masked})充值返利",
"time": r.created_at_bj.strftime('%Y-%m-%d %H:%M')
})
# 4. 消费记录 (GenerationRecord)
from models import GenerationRecord
consumptions = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).limit(100).all()
for c in consumptions:
# 截断 prompt
action = c.prompt[:20] + '...' if c.prompt and len(c.prompt) > 20 else (c.prompt or "AI生成")
history.append({
"type": "consumption",
"type_label": "积分消费",
"amount": f"-{c.cost}",
"desc": f"[{c.model or 'Default'}] {action}",
"time": c.created_at_bj.strftime('%Y-%m-%d %H:%M')
})
# 按时间降序排序
history.sort(key=lambda x: x['time'], reverse=True)
# 手动分页
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 5, type=int)
total = len(history)
start = (page - 1) * per_page
end = start + per_page
import math
return jsonify({
"details": history[start:end],
"total": total,
"pages": math.ceil(total / per_page),
"current_page": page
})
@auth_bp.route('/api/auth/point_stats', methods=['GET'])
def get_point_stats():
"""获取用户过去 7 天的积分变动统计(充值 vs 消费)"""
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "请先登录"}), 401
from models import Order, GenerationRecord, get_bj_now
from datetime import timedelta
end_date = get_bj_now().date()
start_date = end_date - timedelta(days=6)
labels = []
consumption_data = []
recharge_data = []
curr = start_date
while curr <= end_date:
labels.append(curr.strftime('%m-%d'))
# 当天充值 (PAID 订单)
day_recharge = db.session.query(db.func.sum(Order.points)).filter(
Order.user_id == user_id,
Order.status == 'PAID',
db.func.date(Order.paid_at) == curr
).scalar() or 0
# 当天消费 (生成记录)
day_consumption = db.session.query(db.func.sum(GenerationRecord.cost)).filter(
GenerationRecord.user_id == user_id,
db.func.date(GenerationRecord.created_at) == curr
).scalar() or 0
recharge_data.append(int(day_recharge))
consumption_data.append(int(day_consumption))
curr += timedelta(days=1)
return jsonify({
"labels": labels,
"consumption": consumption_data,
"recharge": recharge_data
})
@auth_bp.route('/api/auth/consumption_details', methods=['GET'])
def get_consumption_details():
"""获取用户积分消费明细记录"""
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "请先登录"}), 401
from models import GenerationRecord
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
pagination = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
details = []
for r in pagination.items:
# 截断 prompt 作为动作描述
action = r.prompt[:30] + '...' if r.prompt and len(r.prompt) > 30 else (r.prompt or "生成图片")
details.append({
"id": r.id,
"action": action,
"model": r.model or "Default",
"cost": f"-{r.cost}",
"time": r.created_at_bj.strftime('%Y-%m-%d %H:%M')
})
return jsonify({
"details": details,
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@auth_bp.route('/api/auth/change_password', methods=['POST'])
def change_password():
user_id = session.get('user_id')
if not user_id:
return jsonify({"error": "请先登录"}), 401
data = request.json
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return jsonify({"error": "请填写完整信息"}), 400
user = db.session.get(User, user_id)
if not user.check_password(old_password):
return jsonify({"error": "原密码错误"}), 400
user.set_password(new_password)
db.session.commit()
system_logger.info(f"用户修改密码成功", phone=user.phone, user_id=user.id)
return jsonify({"message": "密码修改成功"})
@auth_bp.route('/api/auth/add_points', methods=['POST'])
def add_points():
"""手动充值积分接口 (暂时禁用,等待正式接入)"""
return jsonify({"error": "充值功能维护中,敬请期待"}), 503
# 原逻辑备份
# user_id = session.get('user_id')
# if not user_id:
# return jsonify({"error": "请先登录"}), 401
# ... (原有逻辑)
def get_user_menu(user):
"""根据用户权限生成菜单列表"""
if not user:
return []
all_menus = [
{"name": "创作工作台", "icon": "wand-2", "url": "/", "perm": None},
{"name": "AI 视频创作", "icon": "video", "url": "/video", "perm": None},
{"name": "验光单助手", "icon": "scan-eye", "url": "/ocr", "perm": None},
{"name": "购买积分", "icon": "shopping-cart", "url": "/buy", "perm": None},
{"name": "积分发放管理", "icon": "gift", "url": "/admin/points", "perm": "manage_system"},
{"name": "权限管理中心", "icon": "shield-check", "url": "/rbac", "perm": "manage_rbac"},
{"name": "系统字典管理", "icon": "book-open", "url": "/dicts", "perm": "manage_dicts"},
{"name": "系统通知管理", "icon": "megaphone", "url": "/notifications", "perm": "manage_notifications"},
{"name": "系统日志审计", "icon": "terminal", "url": "/logs", "perm": "view_logs"},
]
accessible_menu = []
for item in all_menus:
if item["perm"] is None or user.has_permission(item["perm"]):
accessible_menu.append(item)
return accessible_menu
@auth_bp.route('/api/auth/menu', methods=['GET'])
def get_menu():
"""获取动态导航菜单"""
user_id = session.get('user_id')
if not user_id:
return jsonify({"menu": []})
user = db.session.get(User, user_id)
return jsonify({"menu": get_user_menu(user)})
@auth_bp.route('/api/auth/logs', methods=['GET'])
@admin_required
def get_logs():
"""获取系统日志(支持搜索、筛选与分页)"""
from models import SystemLog, User
# 分页与筛选参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
level_filter = request.args.get('level')
search_query = request.args.get('search', '').strip()
query = db.session.query(SystemLog).outerjoin(User)
# 级别过滤
if level_filter:
query = query.filter(SystemLog.level == level_filter)
# 关键词搜索 (支持消息、手机号、IP)
if search_query:
search_filter = db.or_(
SystemLog.message.ilike(f"%{search_query}%"),
SystemLog.ip.ilike(f"%{search_query}%"),
User.phone.ilike(f"%{search_query}%"),
SystemLog.module.ilike(f"%{search_query}%")
)
query = query.filter(search_filter)
# 执行分页查询
pagination = query.order_by(SystemLog.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
logs_data = []
for log in pagination.items:
logs_data.append({
"id": log.id,
"time": log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"level": log.level,
"message": log.message,
"module": log.module,
"user_id": log.user_id,
"user_phone": log.user.phone if log.user else "系统/游客",
"ip": log.ip,
"path": log.path,
"method": log.method,
"extra": json.loads(log.extra) if log.extra else {}
})
return jsonify({
"logs": logs_data,
"total": pagination.total,
"page": page,
"per_page": per_page,
"total_pages": pagination.pages
})