from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for from extensions import db from models import User from services.sms_service import SMSService from services.logger import system_logger from middlewares.auth import admin_required auth_bp = Blueprint('auth', __name__) @auth_bp.route('/login') def login_page(): """登录页面""" if 'user_id' in session: return redirect(url_for('index')) return render_template('login.html') @auth_bp.route('/logs') @admin_required def logs_page(): """日志查看页面""" return render_template('logs.html') @auth_bp.route('/rbac') @admin_required def rbac_page(): """角色权限管理页面""" return render_template('rbac.html') @auth_bp.route('/dicts') @admin_required def dicts_page(): """系统字典管理页面""" return render_template('dicts.html') @auth_bp.route('/notifications') @admin_required def notifications_page(): """系统通知管理页面""" return render_template('notifications.html') @auth_bp.route('/admin/orders') @admin_required def admin_orders_page(): """全员订单管理页面""" return render_template('orders.html') @auth_bp.route('/buy') def buy_page(): """购买积分页面""" if 'user_id' not in session: return redirect(url_for('auth.login_page')) from models import Order, User user_id = session['user_id'] user = User.query.get(user_id) # 获取用户个人充值记录 personal_orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).limit(10).all() # 如果是管理员,获取全员记录 is_admin = user.has_permission('manage_system') admin_orders = [] if is_admin: admin_orders = Order.query.order_by(Order.created_at.desc()).limit(10).all() # 处理支付成功提示 success = request.args.get('success') == 'true' out_trade_no = request.args.get('out_trade_no') order = None if out_trade_no: order = Order.query.filter_by(out_trade_no=out_trade_no).first() import datetime return render_template('buy.html', personal_orders=personal_orders, admin_orders=admin_orders, is_admin=is_admin, modules={'datetime': datetime}, success=success, order=order) @auth_bp.route('/api/auth/send_code', methods=['POST']) def send_code(): data = request.json phone = data.get('phone') if not phone: return jsonify({"error": "请输入手机号"}), 400 system_logger.info(f"用户请求发送验证码", phone=phone) success, msg = SMSService.send_code(phone) if success: system_logger.info(f"验证码发送成功", phone=phone) return jsonify({"message": "验证码已发送"}) system_logger.warning(f"验证码发送失败: {msg}", phone=phone) return jsonify({"error": f"发送失败: {msg}"}), 500 @auth_bp.route('/api/auth/register', methods=['POST']) def register(): data = request.json phone = data.get('phone') code = data.get('code') password = data.get('password') system_logger.info(f"用户注册请求", phone=phone) if not SMSService.verify_code(phone, code): system_logger.warning(f"注册失败: 验证码错误", phone=phone) return jsonify({"error": "验证码错误或已过期"}), 400 if User.query.filter_by(phone=phone).first(): system_logger.warning(f"注册失败: 手机号已存在", phone=phone) return jsonify({"error": "该手机号已注册"}), 400 # 获取默认角色:普通用户 from models import Role user_role = Role.query.filter_by(name='普通用户').first() user = User(phone=phone, role=user_role) user.set_password(password) db.session.add(user) db.session.commit() system_logger.info(f"用户注册成功", phone=phone, user_id=user.id) return jsonify({"message": "注册成功"}) @auth_bp.route('/api/auth/login', methods=['POST']) def login(): data = request.json phone = data.get('phone') password = data.get('password') system_logger.info(f"用户登录尝试", phone=phone) user = User.query.filter_by(phone=phone).first() if user and user.check_password(password): session.permanent = True # 开启持久化会话 (受 Config.PERMANENT_SESSION_LIFETIME 控制) session['user_id'] = user.id system_logger.info(f"用户登录成功", phone=phone, user_id=user.id) return jsonify({"message": "登录成功", "phone": phone}) system_logger.warning(f"登录失败: 手机号或密码错误", phone=phone) return jsonify({"error": "手机号或密码错误"}), 401 @auth_bp.route('/api/auth/logout', methods=['POST']) def logout(): session.pop('user_id', None) return jsonify({"message": "已退出登录"}) @auth_bp.route('/api/auth/me', methods=['GET']) def me(): user_id = session.get('user_id') if not user_id: return jsonify({"logged_in": False}) user = User.query.get(user_id) return jsonify({ "logged_in": True, "phone": user.phone, "api_key": user.api_key, # 返回已保存的 API Key "points": user.points # 返回剩余试用积分 }) @auth_bp.route('/api/auth/change_password', methods=['POST']) def change_password(): user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 data = request.json old_password = data.get('old_password') new_password = data.get('new_password') if not old_password or not new_password: return jsonify({"error": "请填写完整信息"}), 400 user = User.query.get(user_id) if not user.check_password(old_password): return jsonify({"error": "原密码错误"}), 400 user.set_password(new_password) db.session.commit() system_logger.info(f"用户修改密码成功", phone=user.phone, user_id=user.id) return jsonify({"message": "密码修改成功"}) @auth_bp.route('/api/auth/add_points', methods=['POST']) def add_points(): """手动充值积分接口 (暂时禁用,等待正式接入)""" return jsonify({"error": "充值功能维护中,敬请期待"}), 503 # 原逻辑备份 # user_id = session.get('user_id') # if not user_id: # return jsonify({"error": "请先登录"}), 401 # ... (原有逻辑) @auth_bp.route('/api/auth/menu', methods=['GET']) def get_menu(): """获取动态导航菜单""" user_id = session.get('user_id') if not user_id: return jsonify({"menu": []}) user = User.query.get(user_id) if not user: return jsonify({"menu": []}) # 菜单定义库:名称, 图标, 链接, 所需权限 all_menus = [ {"name": "创作工作台", "icon": "wand-2", "url": "/", "perm": None}, {"name": "验光单助手", "icon": "scan-eye", "url": "/ocr", "perm": None}, {"name": "购买积分", "icon": "shopping-cart", "url": "/buy", "perm": None}, {"name": "权限管理中心", "icon": "shield-check", "url": "/rbac", "perm": "manage_rbac"}, {"name": "系统字典管理", "icon": "book-open", "url": "/dicts", "perm": "manage_dicts"}, {"name": "系统通知管理", "icon": "megaphone", "url": "/notifications", "perm": "manage_notifications"}, {"name": "系统日志审计", "icon": "terminal", "url": "/logs", "perm": "view_logs"}, ] # 根据权限过滤 accessible_menu = [] for item in all_menus: if item["perm"] is None or user.has_permission(item["perm"]): accessible_menu.append(item) return jsonify({"menu": accessible_menu}) @auth_bp.route('/api/auth/logs', methods=['GET']) def get_logs(): """获取系统日志(支持搜索和筛选)""" user_id = session.get('user_id') if not user_id: return jsonify({"error": "请先登录"}), 401 from extensions import redis_client import json # 筛选参数 level_filter = request.args.get('level') search_query = request.args.get('search', '').lower() # 从 Redis 获取日志 (从 ZSET 读取,按分数降序排列,即最新在前) logs = redis_client.zrevrange('system_logs_zset', 0, -1) log_list = [] for log in logs: item = json.loads(log.decode('utf-8')) # 级别过滤 if level_filter and item['level'] != level_filter: continue # 关键词搜索 (搜索内容、手机号或其它 Extra 字段) if search_query: message_match = search_query in item['message'].lower() extra_match = any(search_query in str(v).lower() for v in item.get('extra', {}).values()) if not (message_match or extra_match): continue log_list.append(item) return jsonify({"logs": log_list})