from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for import json from datetime import timedelta from extensions import db from models import User, get_bj_now from services.sms_service import SMSService from services.captcha_service import CaptchaService from services.logger import system_logger from middlewares.auth import admin_required auth_bp = Blueprint('auth', __name__) @auth_bp.route('/login') def login_page(): """登录页面""" if 'user_id' in session: return redirect(url_for('index')) return render_template('login.html') @auth_bp.route('/logs') @admin_required def logs_page(): """日志查看页面""" return render_template('logs.html') @auth_bp.route('/rbac') @admin_required def rbac_page(): """角色权限管理页面""" return render_template('rbac.html') @auth_bp.route('/dicts') @admin_required def dicts_page(): """系统字典管理页面""" return render_template('dicts.html') @auth_bp.route('/notifications') @admin_required def notifications_page(): """系统通知管理页面""" return render_template('notifications.html') @auth_bp.route('/admin/orders') @admin_required def admin_orders_page(): """全员订单管理页面""" return render_template('orders.html') @auth_bp.route('/admin/points') @admin_required def admin_points_page(): """积分发放管理页面""" return render_template('points.html') @auth_bp.route('/buy') def buy_page(): """购买积分页面""" if 'user_id' not in session: return redirect(url_for('auth.login_page')) from models import Order, User user_id = session['user_id'] user = db.session.get(User, user_id) thirty_min_ago = get_bj_now() - timedelta(minutes=30) # 获取用户个人充值记录 (过滤掉超过 30 分钟未支付的订单) personal_orders = Order.query.filter( Order.user_id == user_id, db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).limit(10).all() # 如果是管理员,获取全员记录 is_admin = user.has_permission('manage_system') admin_orders = [] if is_admin: admin_orders = Order.query.filter( db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).limit(10).all() # 处理支付成功提示 success = request.args.get('success') == 'true' out_trade_no = request.args.get('out_trade_no') order = None if out_trade_no: order = Order.query.filter_by(out_trade_no=out_trade_no).first() import datetime as dt_module return render_template('buy.html', user=user, personal_orders=personal_orders, admin_orders=admin_orders, is_admin=is_admin, modules={'datetime': dt_module}, success=success, order=order) @auth_bp.route('/recharge_history') def recharge_history_page(): """充值历史记录页面""" if 'user_id' not in session: return redirect(url_for('auth.login_page')) from models import Order, User user_id = session['user_id'] user = db.session.get(User, user_id) # 获取用户所有充值记录 orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).all() return render_template('recharge_history.html', user=user, orders=orders) @auth_bp.route('/api/auth/captcha') def get_captcha(): """获取图形验证码并存入 Redis""" phone = request.args.get('phone') if not phone: return jsonify({"error": "缺少参数"}), 400 text, img_bytes = CaptchaService.generate_captcha() from extensions import redis_client # 存入 Redis,有效期 5 分钟 redis_client.setex(f"captcha:{phone}", 300, text.lower()) from flask import Response return Response(img_bytes, mimetype='image/png') @auth_bp.route('/api/auth/send_code', methods=['POST']) def send_code(): data = request.json phone = data.get('phone') captcha = data.get('captcha') ip = request.remote_addr if not phone: return jsonify({"error": "请输入手机号"}), 400 import re if not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 if not captcha: return jsonify({"error": "请输入图形验证码", "show_captcha": True}), 403 from extensions import redis_client # 1. 验证图形验证码 saved_captcha = redis_client.get(f"captcha:{phone}") if not saved_captcha or captcha.lower() != saved_captcha.decode('utf-8'): return jsonify({"error": "图形验证码错误或已过期", "refresh_captcha": True}), 403 # 验证后立即删除,防止被脚本重复利用来刷短信 redis_client.delete(f"captcha:{phone}") # 2. 频率限制:单手机号 60秒 一次 (后端兜底) if redis_client.get(f"sms_lock:{phone}"): return jsonify({"error": "发送过于频繁,请稍后再试"}), 429 # 3. 每日限制:单手机号每天最多 10 条 day_count_key = f"sms_day_count:{phone}" day_count = int(redis_client.get(day_count_key) or 0) if day_count >= 10: return jsonify({"error": "该手机号今日获取验证码次数已达上限"}), 429 # 4. 每日限制:单 IP 每天最多 20 条 (防止换号刷) ip_count_key = f"sms_ip_count:{ip}" ip_count = int(redis_client.get(ip_count_key) or 0) if ip_count >= 20: return jsonify({"error": "您的设备今日发送请求过多,请明天再试"}), 429 system_logger.info(f"用户请求发送验证码", phone=phone, ip=ip) success, msg = SMSService.send_code(phone) if success: # 设置各种限制标记 now = get_bj_now() seconds_until_midnight = ((23 - now.hour) * 3600) + ((59 - now.minute) * 60) + (60 - now.second) redis_client.setex(f"sms_lock:{phone}", 60, "1") redis_client.setex(day_count_key, seconds_until_midnight, day_count + 1) redis_client.setex(ip_count_key, seconds_until_midnight, ip_count + 1) system_logger.info(f"验证码发送成功", phone=phone) return jsonify({"message": "验证码已发送"}) system_logger.warning(f"验证码发送失败: {msg}", phone=phone) return jsonify({"error": f"发送失败: {msg}"}), 500 @auth_bp.route('/api/auth/register', methods=['POST']) def register(): data = request.json phone = data.get('phone') code = data.get('code') password = data.get('password') invite_code = data.get('invite_code', '').strip().upper() # 获取邀请码 import re if not phone or not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 system_logger.info(f"用户注册请求", phone=phone) if not SMSService.verify_code(phone, code): system_logger.warning(f"注册失败: 验证码错误", phone=phone) return jsonify({"error": "验证码错误或已过期"}), 400 if User.query.filter_by(phone=phone).first(): system_logger.warning(f"注册失败: 手机号已存在", phone=phone) return jsonify({"error": "该手机号已注册"}), 400 # 验证邀请码(如果提供) inviter = None if invite_code: inviter = User.query.filter_by(invite_code=invite_code).first() if not inviter: return jsonify({"error": "邀请码无效,请检查后重试"}), 400 # 获取默认角色:普通用户 from models import Role user_role = Role.query.filter_by(name='普通用户').first() # 生成唯一邀请码 import random import string def generate_invite_code(): while True: code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) if not User.query.filter_by(invite_code=code).first(): return code user = User(phone=phone, role=user_role, invite_code=generate_invite_code()) if inviter: user.invited_by = inviter.id user.set_password(password) db.session.add(user) db.session.commit() if inviter: system_logger.info(f"用户注册成功(通过邀请码)", phone=phone, user_id=user.id, inviter_id=inviter.id) else: system_logger.info(f"用户注册成功", phone=phone, user_id=user.id) return jsonify({"message": "注册成功"}) @auth_bp.route('/api/auth/login', methods=['POST']) def login(): data = request.json phone = data.get('phone') password = data.get('password') code = data.get('code') # 可能是高频报错后强制要求的验证码 if not phone or not password: return jsonify({"error": "请输入手机号和密码"}), 400 from extensions import redis_client fail_key = f"login_fail_count:{phone}" fail_count = int(redis_client.get(fail_key) or 0) # 如果失败次数过多,强制要求图型验证码 if fail_count >= 3: if not code: system_logger.warning(f"触发强制安全验证", phone=phone) return jsonify({ "error": "由于密码错误次数过多,请输入图形验证码", "require_captcha": True }), 403 # 验证图形验证码 saved_captcha = redis_client.get(f"captcha:{phone}") if not saved_captcha or code.lower() != saved_captcha.decode('utf-8'): return jsonify({"error": "验证码错误或已过期"}), 400 # 验证成功后删除,防止重复使用 redis_client.delete(f"captcha:{phone}") system_logger.info(f"用户登录尝试", phone=phone) user = User.query.filter_by(phone=phone).first() if user and user.check_password(password): if user.is_banned: system_logger.warning(f"被封禁用户尝试登录", phone=phone) return jsonify({"error": "您的账号已被封禁,请联系管理员"}), 403 # 登录成功,清除失败计数 redis_client.delete(fail_key) session.permanent = True session['user_id'] = user.id system_logger.info(f"用户登录成功", phone=phone, user_id=user.id) return jsonify({"message": "登录成功", "phone": phone}) # 登录失败,增加计数 (有效期 1 小时) redis_client.setex(fail_key, 3600, fail_count + 1) system_logger.warning(f"登录失败: 手机号或密码错误 [次数: {fail_count+1}]", phone=phone) return jsonify({"error": "手机号或密码错误"}), 401 @auth_bp.route('/api/auth/reset_password', methods=['POST']) def reset_password(): """通过短信重置密码""" data = request.json phone = data.get('phone') code = data.get('code') new_password = data.get('password') import re if not phone or not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 if not phone or not code or not new_password: return jsonify({"error": "请填写完整信息"}), 400 if not SMSService.verify_code(phone, code): return jsonify({"error": "验证码错误或已过期"}), 400 user = User.query.filter_by(phone=phone).first() if not user: return jsonify({"error": "该手机号尚未注册"}), 404 user.set_password(new_password) db.session.commit() # 重置成功后清理失败计数 from extensions import redis_client redis_client.delete(f"login_fail_count:{phone}") system_logger.info(f"用户通过短信重置密码成功", phone=phone, user_id=user.id) return jsonify({"message": "密码重置成功,请使用新密码登录"}) @auth_bp.route('/api/auth/logout', methods=['POST']) def logout(): session.pop('user_id', None) return jsonify({"message": "已退出登录"}) @auth_bp.route('/api/auth/me', methods=['GET']) def me(): user_id = session.get('user_id') if not user_id: return jsonify({"logged_in": False}) user = db.session.get(User, user_id) if not user: session.pop('user_id', None) return jsonify({"logged_in": False}) # 脱敏手机号: 13812345678 -> 138****5678 phone = user.phone masked_phone = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone return jsonify({ "logged_in": True, "phone": masked_phone, # 默认返回脱敏的供前端显示 "full_phone": phone, # 某些场景可能需要完整号 "api_key": user.api_key, "points": user.points, "hide_custom_key": (not user.api_key) or user.has_used_points, # Key为空或使用过积分,则隐藏自定义Key入口 "invite_code": user.invite_code # 用户邀请码 }) @auth_bp.route('/api/auth/invite_stats', methods=['GET']) def get_my_invite_stats(): """获取当前用户的邀请统计""" user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 user = db.session.get(User, user_id) if not user: return jsonify({"error": "用户不存在"}), 404 from models import InviteReward, Order # 我邀请的用户数 invited_count = User.query.filter_by(invited_by=user_id).count() # 我获得的邀请奖励总积分 total_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter( InviteReward.inviter_id == user_id ).scalar() or 0 # 我邀请的用户列表(脱敏) invited_users = User.query.filter_by(invited_by=user_id).order_by(User.created_at.desc()).limit(20).all() invited_list = [] for u in invited_users: # 脱敏手机号 masked = u.phone[:3] + "****" + u.phone[-4:] if len(u.phone) >= 11 else u.phone # 计算该用户给我带来的奖励 user_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).filter( InviteReward.inviter_id == user_id, InviteReward.invitee_id == u.id ).scalar() or 0 invited_list.append({ "phone": masked, "registered_at": u.created_at_bj.strftime('%Y-%m-%d') if u.created_at else None, "rewards_earned": int(user_rewards) }) return jsonify({ "invite_code": user.invite_code, "invited_count": invited_count, "total_rewards": int(total_rewards), "invited_users": invited_list }) @auth_bp.route('/api/auth/point_history', methods=['GET']) def get_point_history(): """获取当前用户的积分变动历史记录""" user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 from models import Order, PointsGrant, InviteReward, User history = [] # 1. 充值记录 orders = Order.query.filter_by(user_id=user_id, status='PAID').all() for o in orders: history.append({ "type": "recharge", "type_label": "账户充值", "amount": f"+{o.points}", "desc": f"在线支付 ¥{o.amount}", "time": o.paid_at_bj.strftime('%Y-%m-%d %H:%M') if o.paid_at_bj else o.created_at_bj.strftime('%Y-%m-%d %H:%M') }) # 2. 手动发放记录 grants = PointsGrant.query.filter_by(user_id=user_id).all() for g in grants: history.append({ "type": "grant", "type_label": "系统发放", "amount": f"+{g.points}", "desc": g.reason or "管理员手动调账", "time": g.created_at_bj.strftime('%Y-%m-%d %H:%M') }) # 3. 邀请奖励记录 rewards = InviteReward.query.filter_by(inviter_id=user_id).all() for r in rewards: # 查找被邀请人信息 invitee = db.session.get(User, r.invitee_id) phone = invitee.phone if invitee else "未知用户" masked = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone history.append({ "type": "reward", "type_label": "邀请奖励", "amount": f"+{r.reward_points}", "desc": f"好友({masked})充值返利", "time": r.created_at_bj.strftime('%Y-%m-%d %H:%M') }) # 4. 消费记录 (GenerationRecord) from models import GenerationRecord consumptions = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).limit(100).all() for c in consumptions: # 截断 prompt action = c.prompt[:20] + '...' if c.prompt and len(c.prompt) > 20 else (c.prompt or "AI生成") history.append({ "type": "consumption", "type_label": "积分消费", "amount": f"-{c.cost}", "desc": f"[{c.model or 'Default'}] {action}", "time": c.created_at_bj.strftime('%Y-%m-%d %H:%M') }) # 按时间降序排序 history.sort(key=lambda x: x['time'], reverse=True) # 手动分页 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 5, type=int) total = len(history) start = (page - 1) * per_page end = start + per_page import math return jsonify({ "details": history[start:end], "total": total, "pages": math.ceil(total / per_page), "current_page": page }) @auth_bp.route('/api/auth/point_stats', methods=['GET']) def get_point_stats(): """获取用户过去 7 天的积分变动统计(充值 vs 消费)""" user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 from models import Order, GenerationRecord, get_bj_now from datetime import timedelta end_date = get_bj_now().date() start_date = end_date - timedelta(days=6) labels = [] consumption_data = [] recharge_data = [] curr = start_date while curr <= end_date: labels.append(curr.strftime('%m-%d')) # 当天充值 (PAID 订单) day_recharge = db.session.query(db.func.sum(Order.points)).filter( Order.user_id == user_id, Order.status == 'PAID', db.func.date(Order.paid_at) == curr ).scalar() or 0 # 当天消费 (生成记录) day_consumption = db.session.query(db.func.sum(GenerationRecord.cost)).filter( GenerationRecord.user_id == user_id, db.func.date(GenerationRecord.created_at) == curr ).scalar() or 0 recharge_data.append(int(day_recharge)) consumption_data.append(int(day_consumption)) curr += timedelta(days=1) return jsonify({ "labels": labels, "consumption": consumption_data, "recharge": recharge_data }) @auth_bp.route('/api/auth/consumption_details', methods=['GET']) def get_consumption_details(): """获取用户积分消费明细记录""" user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 from models import GenerationRecord page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) pagination = GenerationRecord.query.filter_by(user_id=user_id).order_by(GenerationRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) details = [] for r in pagination.items: # 截断 prompt 作为动作描述 action = r.prompt[:30] + '...' if r.prompt and len(r.prompt) > 30 else (r.prompt or "生成图片") details.append({ "id": r.id, "action": action, "model": r.model or "Default", "cost": f"-{r.cost}", "time": r.created_at_bj.strftime('%Y-%m-%d %H:%M') }) return jsonify({ "details": details, "total": pagination.total, "pages": pagination.pages, "current_page": pagination.page }) @auth_bp.route('/api/auth/change_password', methods=['POST']) def change_password(): user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 data = request.json old_password = data.get('old_password') new_password = data.get('new_password') if not old_password or not new_password: return jsonify({"error": "请填写完整信息"}), 400 user = db.session.get(User, user_id) if not user.check_password(old_password): return jsonify({"error": "原密码错误"}), 400 user.set_password(new_password) db.session.commit() system_logger.info(f"用户修改密码成功", phone=user.phone, user_id=user.id) return jsonify({"message": "密码修改成功"}) @auth_bp.route('/api/auth/add_points', methods=['POST']) def add_points(): """手动充值积分接口 (暂时禁用,等待正式接入)""" return jsonify({"error": "充值功能维护中,敬请期待"}), 503 # 原逻辑备份 # user_id = session.get('user_id') # if not user_id: # return jsonify({"error": "请先登录"}), 401 # ... (原有逻辑) def get_user_menu(user): """根据用户权限生成菜单列表""" if not user: return [] all_menus = [ {"name": "创作工作台", "icon": "wand-2", "url": "/", "perm": None}, {"name": "AI 视频创作", "icon": "video", "url": "/video", "perm": None}, {"name": "验光单助手", "icon": "scan-eye", "url": "/ocr", "perm": None}, {"name": "购买积分", "icon": "shopping-cart", "url": "/buy", "perm": None}, {"name": "积分发放管理", "icon": "gift", "url": "/admin/points", "perm": "manage_system"}, {"name": "权限管理中心", "icon": "shield-check", "url": "/rbac", "perm": "manage_rbac"}, {"name": "系统字典管理", "icon": "book-open", "url": "/dicts", "perm": "manage_dicts"}, {"name": "系统通知管理", "icon": "megaphone", "url": "/notifications", "perm": "manage_notifications"}, {"name": "系统日志审计", "icon": "terminal", "url": "/logs", "perm": "view_logs"}, ] accessible_menu = [] for item in all_menus: if item["perm"] is None or user.has_permission(item["perm"]): accessible_menu.append(item) return accessible_menu @auth_bp.route('/api/auth/menu', methods=['GET']) def get_menu(): """获取动态导航菜单""" user_id = session.get('user_id') if not user_id: return jsonify({"menu": []}) user = db.session.get(User, user_id) return jsonify({"menu": get_user_menu(user)}) @auth_bp.route('/api/auth/logs', methods=['GET']) @admin_required def get_logs(): """获取系统日志(支持搜索、筛选与分页)""" from models import SystemLog, User # 分页与筛选参数 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 20, type=int) level_filter = request.args.get('level') search_query = request.args.get('search', '').strip() query = db.session.query(SystemLog).outerjoin(User) # 级别过滤 if level_filter: query = query.filter(SystemLog.level == level_filter) # 关键词搜索 (支持消息、手机号、IP) if search_query: search_filter = db.or_( SystemLog.message.ilike(f"%{search_query}%"), SystemLog.ip.ilike(f"%{search_query}%"), User.phone.ilike(f"%{search_query}%"), SystemLog.module.ilike(f"%{search_query}%") ) query = query.filter(search_filter) # 执行分页查询 pagination = query.order_by(SystemLog.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) logs_data = [] for log in pagination.items: logs_data.append({ "id": log.id, "time": log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), "level": log.level, "message": log.message, "module": log.module, "user_id": log.user_id, "user_phone": log.user.phone if log.user else "系统/游客", "ip": log.ip, "path": log.path, "method": log.method, "extra": json.loads(log.extra) if log.extra else {} }) return jsonify({ "logs": logs_data, "total": pagination.total, "page": page, "per_page": per_page, "total_pages": pagination.pages })