from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for import json from datetime import timedelta from extensions import db from models import User, get_bj_now from services.sms_service import SMSService from services.captcha_service import CaptchaService from services.logger import system_logger from middlewares.auth import admin_required auth_bp = Blueprint('auth', __name__) @auth_bp.route('/login') def login_page(): """登录页面""" if 'user_id' in session: return redirect(url_for('index')) return render_template('login.html') @auth_bp.route('/logs') @admin_required def logs_page(): """日志查看页面""" return render_template('logs.html') @auth_bp.route('/rbac') @admin_required def rbac_page(): """角色权限管理页面""" return render_template('rbac.html') @auth_bp.route('/dicts') @admin_required def dicts_page(): """系统字典管理页面""" return render_template('dicts.html') @auth_bp.route('/notifications') @admin_required def notifications_page(): """系统通知管理页面""" return render_template('notifications.html') @auth_bp.route('/admin/orders') @admin_required def admin_orders_page(): """全员订单管理页面""" return render_template('orders.html') @auth_bp.route('/buy') def buy_page(): """购买积分页面""" if 'user_id' not in session: return redirect(url_for('auth.login_page')) from models import Order, User user_id = session['user_id'] user = db.session.get(User, user_id) thirty_min_ago = get_bj_now() - timedelta(minutes=30) # 获取用户个人充值记录 (过滤掉超过 30 分钟未支付的订单) personal_orders = Order.query.filter( Order.user_id == user_id, db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).limit(10).all() # 如果是管理员,获取全员记录 is_admin = user.has_permission('manage_system') admin_orders = [] if is_admin: admin_orders = Order.query.filter( db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).limit(10).all() # 处理支付成功提示 success = request.args.get('success') == 'true' out_trade_no = request.args.get('out_trade_no') order = None if out_trade_no: order = Order.query.filter_by(out_trade_no=out_trade_no).first() import datetime as dt_module return render_template('buy.html', personal_orders=personal_orders, admin_orders=admin_orders, is_admin=is_admin, modules={'datetime': dt_module}, success=success, order=order) @auth_bp.route('/api/auth/captcha') def get_captcha(): """获取图形验证码并存入 Redis""" phone = request.args.get('phone') if not phone: return jsonify({"error": "缺少参数"}), 400 text, img_bytes = CaptchaService.generate_captcha() from extensions import redis_client # 存入 Redis,有效期 5 分钟 redis_client.setex(f"captcha:{phone}", 300, text.lower()) from flask import Response return Response(img_bytes, mimetype='image/png') @auth_bp.route('/api/auth/send_code', methods=['POST']) def send_code(): data = request.json phone = data.get('phone') captcha = data.get('captcha') ip = request.remote_addr if not phone: return jsonify({"error": "请输入手机号"}), 400 import re if not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 if not captcha: return jsonify({"error": "请输入图形验证码", "show_captcha": True}), 403 from extensions import redis_client # 1. 验证图形验证码 saved_captcha = redis_client.get(f"captcha:{phone}") if not saved_captcha or captcha.lower() != saved_captcha.decode('utf-8'): return jsonify({"error": "图形验证码错误或已过期", "refresh_captcha": True}), 403 # 验证后立即删除,防止被脚本重复利用来刷短信 redis_client.delete(f"captcha:{phone}") # 2. 频率限制:单手机号 60秒 一次 (后端兜底) if redis_client.get(f"sms_lock:{phone}"): return jsonify({"error": "发送过于频繁,请稍后再试"}), 429 # 3. 每日限制:单手机号每天最多 10 条 day_count_key = f"sms_day_count:{phone}" day_count = int(redis_client.get(day_count_key) or 0) if day_count >= 10: return jsonify({"error": "该手机号今日获取验证码次数已达上限"}), 429 # 4. 每日限制:单 IP 每天最多 20 条 (防止换号刷) ip_count_key = f"sms_ip_count:{ip}" ip_count = int(redis_client.get(ip_count_key) or 0) if ip_count >= 20: return jsonify({"error": "您的设备今日发送请求过多,请明天再试"}), 429 system_logger.info(f"用户请求发送验证码", phone=phone, ip=ip) success, msg = SMSService.send_code(phone) if success: # 设置各种限制标记 now = get_bj_now() seconds_until_midnight = ((23 - now.hour) * 3600) + ((59 - now.minute) * 60) + (60 - now.second) redis_client.setex(f"sms_lock:{phone}", 60, "1") redis_client.setex(day_count_key, seconds_until_midnight, day_count + 1) redis_client.setex(ip_count_key, seconds_until_midnight, ip_count + 1) system_logger.info(f"验证码发送成功", phone=phone) return jsonify({"message": "验证码已发送"}) system_logger.warning(f"验证码发送失败: {msg}", phone=phone) return jsonify({"error": f"发送失败: {msg}"}), 500 @auth_bp.route('/api/auth/register', methods=['POST']) def register(): data = request.json phone = data.get('phone') code = data.get('code') password = data.get('password') import re if not phone or not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 system_logger.info(f"用户注册请求", phone=phone) if not SMSService.verify_code(phone, code): system_logger.warning(f"注册失败: 验证码错误", phone=phone) return jsonify({"error": "验证码错误或已过期"}), 400 if User.query.filter_by(phone=phone).first(): system_logger.warning(f"注册失败: 手机号已存在", phone=phone) return jsonify({"error": "该手机号已注册"}), 400 # 获取默认角色:普通用户 from models import Role user_role = Role.query.filter_by(name='普通用户').first() user = User(phone=phone, role=user_role) user.set_password(password) db.session.add(user) db.session.commit() system_logger.info(f"用户注册成功", phone=phone, user_id=user.id) return jsonify({"message": "注册成功"}) @auth_bp.route('/api/auth/login', methods=['POST']) def login(): data = request.json phone = data.get('phone') password = data.get('password') code = data.get('code') # 可能是高频报错后强制要求的验证码 if not phone or not password: return jsonify({"error": "请输入手机号和密码"}), 400 from extensions import redis_client fail_key = f"login_fail_count:{phone}" fail_count = int(redis_client.get(fail_key) or 0) # 如果失败次数过多,强制要求图型验证码 if fail_count >= 3: if not code: system_logger.warning(f"触发强制安全验证", phone=phone) return jsonify({ "error": "由于密码错误次数过多,请输入图形验证码", "require_captcha": True }), 403 # 验证图形验证码 saved_captcha = redis_client.get(f"captcha:{phone}") if not saved_captcha or code.lower() != saved_captcha.decode('utf-8'): return jsonify({"error": "验证码错误或已过期"}), 400 # 验证成功后删除,防止重复使用 redis_client.delete(f"captcha:{phone}") system_logger.info(f"用户登录尝试", phone=phone) user = User.query.filter_by(phone=phone).first() if user and user.check_password(password): if user.is_banned: system_logger.warning(f"被封禁用户尝试登录", phone=phone) return jsonify({"error": "您的账号已被封禁,请联系管理员"}), 403 # 登录成功,清除失败计数 redis_client.delete(fail_key) session.permanent = True session['user_id'] = user.id system_logger.info(f"用户登录成功", phone=phone, user_id=user.id) return jsonify({"message": "登录成功", "phone": phone}) # 登录失败,增加计数 (有效期 1 小时) redis_client.setex(fail_key, 3600, fail_count + 1) system_logger.warning(f"登录失败: 手机号或密码错误 [次数: {fail_count+1}]", phone=phone) return jsonify({"error": "手机号或密码错误"}), 401 @auth_bp.route('/api/auth/reset_password', methods=['POST']) def reset_password(): """通过短信重置密码""" data = request.json phone = data.get('phone') code = data.get('code') new_password = data.get('password') import re if not phone or not re.match(r'^1[3-9]\d{9}$', phone): return jsonify({"error": "手机号格式不正确"}), 400 if not phone or not code or not new_password: return jsonify({"error": "请填写完整信息"}), 400 if not SMSService.verify_code(phone, code): return jsonify({"error": "验证码错误或已过期"}), 400 user = User.query.filter_by(phone=phone).first() if not user: return jsonify({"error": "该手机号尚未注册"}), 404 user.set_password(new_password) db.session.commit() # 重置成功后清理失败计数 from extensions import redis_client redis_client.delete(f"login_fail_count:{phone}") system_logger.info(f"用户通过短信重置密码成功", phone=phone, user_id=user.id) return jsonify({"message": "密码重置成功,请使用新密码登录"}) @auth_bp.route('/api/auth/logout', methods=['POST']) def logout(): session.pop('user_id', None) return jsonify({"message": "已退出登录"}) @auth_bp.route('/api/auth/me', methods=['GET']) def me(): user_id = session.get('user_id') if not user_id: return jsonify({"logged_in": False}) user = db.session.get(User, user_id) if not user: session.pop('user_id', None) return jsonify({"logged_in": False}) # 脱敏手机号: 13812345678 -> 138****5678 phone = user.phone masked_phone = phone[:3] + "****" + phone[-4:] if len(phone) >= 11 else phone return jsonify({ "logged_in": True, "phone": masked_phone, # 默认返回脱敏的供前端显示 "full_phone": phone, # 某些场景可能需要完整号 "api_key": user.api_key, "points": user.points, "hide_custom_key": (not user.api_key) or user.has_used_points # Key为空或使用过积分,则隐藏自定义Key入口 }) @auth_bp.route('/api/auth/change_password', methods=['POST']) def change_password(): user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 data = request.json old_password = data.get('old_password') new_password = data.get('new_password') if not old_password or not new_password: return jsonify({"error": "请填写完整信息"}), 400 user = db.session.get(User, user_id) if not user.check_password(old_password): return jsonify({"error": "原密码错误"}), 400 user.set_password(new_password) db.session.commit() system_logger.info(f"用户修改密码成功", phone=user.phone, user_id=user.id) return jsonify({"message": "密码修改成功"}) @auth_bp.route('/api/auth/add_points', methods=['POST']) def add_points(): """手动充值积分接口 (暂时禁用,等待正式接入)""" return jsonify({"error": "充值功能维护中,敬请期待"}), 503 # 原逻辑备份 # user_id = session.get('user_id') # if not user_id: # return jsonify({"error": "请先登录"}), 401 # ... (原有逻辑) def get_user_menu(user): """根据用户权限生成菜单列表""" if not user: return [] all_menus = [ {"name": "创作工作台", "icon": "wand-2", "url": "/", "perm": None}, {"name": "AI 视频创作", "icon": "video", "url": "/video", "perm": None}, {"name": "验光单助手", "icon": "scan-eye", "url": "/ocr", "perm": None}, {"name": "购买积分", "icon": "shopping-cart", "url": "/buy", "perm": None}, {"name": "权限管理中心", "icon": "shield-check", "url": "/rbac", "perm": "manage_rbac"}, {"name": "系统字典管理", "icon": "book-open", "url": "/dicts", "perm": "manage_dicts"}, {"name": "系统通知管理", "icon": "megaphone", "url": "/notifications", "perm": "manage_notifications"}, {"name": "系统日志审计", "icon": "terminal", "url": "/logs", "perm": "view_logs"}, ] accessible_menu = [] for item in all_menus: if item["perm"] is None or user.has_permission(item["perm"]): accessible_menu.append(item) return accessible_menu @auth_bp.route('/api/auth/menu', methods=['GET']) def get_menu(): """获取动态导航菜单""" user_id = session.get('user_id') if not user_id: return jsonify({"menu": []}) user = db.session.get(User, user_id) return jsonify({"menu": get_user_menu(user)}) @auth_bp.route('/api/auth/logs', methods=['GET']) @admin_required def get_logs(): """获取系统日志(支持搜索、筛选与分页)""" from models import SystemLog, User # 分页与筛选参数 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 20, type=int) level_filter = request.args.get('level') search_query = request.args.get('search', '').strip() query = db.session.query(SystemLog).outerjoin(User) # 级别过滤 if level_filter: query = query.filter(SystemLog.level == level_filter) # 关键词搜索 (支持消息、手机号、IP) if search_query: search_filter = db.or_( SystemLog.message.ilike(f"%{search_query}%"), SystemLog.ip.ilike(f"%{search_query}%"), User.phone.ilike(f"%{search_query}%"), SystemLog.module.ilike(f"%{search_query}%") ) query = query.filter(search_filter) # 执行分页查询 pagination = query.order_by(SystemLog.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) logs_data = [] for log in pagination.items: logs_data.append({ "id": log.id, "time": log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), "level": log.level, "message": log.message, "module": log.module, "user_id": log.user_id, "user_phone": log.user.phone if log.user else "系统/游客", "ip": log.ip, "path": log.path, "method": log.method, "extra": json.loads(log.extra) if log.extra else {} }) return jsonify({ "logs": logs_data, "total": pagination.total, "page": page, "per_page": per_page, "total_pages": pagination.pages })