ai_v/blueprints/admin.py
24024 bd80414c4d ```
feat(admin): 添加订单详情页面和API接口

- 新增 `/admin/orders/<int:order_id>` 路由用于显示订单详情页面
- 在管理后台的订单列表中添加查看操作按钮
- 实现 `get_order_detail` API 接口,提供订单详细信息
- 添加权限控制,确保只有管理员或订单所有者可访问
- 在充值历史页面也增加订单详情查看功能
- 更新表格布局以适应新增的操作列
```
2026-02-08 20:39:35 +08:00

636 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify, g
from datetime import timedelta
from extensions import db
from models import User, Role, Permission, SystemDict, SystemNotification, Order, PointsGrant, InviteReward, to_bj_time, get_bj_now
from middlewares.auth import permission_required, login_required
from services.logger import system_logger
admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
# --- 角色管理 ---
@admin_bp.route('/roles', methods=['GET'])
@permission_required('manage_rbac')
def get_roles():
roles = Role.query.order_by(Role.id).all()
return jsonify({
"roles": [{
"id": r.id,
"name": r.name,
"description": r.description,
"permissions": [p.name for p in r.permissions]
} for r in roles]
})
@admin_bp.route('/roles', methods=['POST'])
@permission_required('manage_rbac')
def save_role():
data = request.json
role_id = data.get('id')
if role_id:
role = Role.query.get(role_id)
if not role: return jsonify({"error": "角色不存在"}), 404
role.name = data['name']
role.description = data.get('description')
system_logger.info(f"管理员修改角色: {role.name}")
else:
role = Role(name=data['name'], description=data.get('description'))
db.session.add(role)
system_logger.info(f"管理员创建角色: {role.name}")
if 'permissions' in data:
perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all()
role.permissions = perms
db.session.commit()
return jsonify({"message": "角色保存成功"})
@admin_bp.route('/roles/delete', methods=['POST'])
@permission_required('manage_rbac')
def delete_role():
data = request.json
role = Role.query.get(data.get('id'))
if role:
if role.name == '超级管理员':
return jsonify({"error": "不能删除超级管理员角色"}), 400
role_name = role.name
db.session.delete(role)
db.session.commit()
system_logger.info(f"管理员删除角色: {role_name}")
return jsonify({"message": "角色删除成功"})
return jsonify({"error": "角色不存在"}), 404
# --- 权限管理 ---
@admin_bp.route('/permissions', methods=['GET'])
@permission_required('manage_rbac')
def get_permissions():
perms = Permission.query.order_by(Permission.id).all()
return jsonify({
"permissions": [{"name": p.name, "description": p.description} for p in perms]
})
# --- 用户角色分配 ---
@admin_bp.route('/users', methods=['GET'])
@permission_required('manage_users')
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q') # 搜索关键字 (手机号)
query = User.query
if search:
query = query.filter(User.phone.like(f"%{search}%"))
pagination = query.order_by(User.id.asc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"users": [{
"id": u.id,
"phone": u.phone,
"role": u.role.name if u.role else "未分配",
"role_id": u.role.id if u.role else None,
"is_banned": u.is_banned,
"points": u.points,
"created_at": u.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for u in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/users/assign', methods=['POST'])
@permission_required('manage_users')
def assign_role():
data = request.json
user = db.session.get(User, data['user_id'])
role = db.session.get(Role, data['role_id'])
if user and role:
user.role = role
db.session.commit()
system_logger.info(f"管理员分配用户角色", user_phone=user.phone, role_name=role.name)
return jsonify({"message": "角色分配成功"})
return jsonify({"error": "用户或角色不存在"}), 404
@admin_bp.route('/users/toggle_ban', methods=['POST'])
@permission_required('manage_users')
def toggle_ban():
data = request.json
user = db.session.get(User, data['user_id'])
if user:
if user.role and user.role.name == '超级管理员':
return jsonify({"error": "不能封禁超级管理员"}), 400
user.is_banned = not user.is_banned
db.session.commit()
status = "封禁" if user.is_banned else "解封"
system_logger.warning(f"管理员{status}了用户: {user.phone}")
return jsonify({"message": f"用户已{status}"})
return jsonify({"error": "用户不存在"}), 404
# --- 字典管理 ---
@admin_bp.route('/dict_types', methods=['GET'])
@permission_required('manage_dicts')
def get_dict_types():
# 获取唯一的字典类型及其记录数
counts = dict(db.session.query(SystemDict.dict_type, db.func.count(SystemDict.id))\
.group_by(SystemDict.dict_type).all())
# 获取类型别名配置 (dict_type='dict_type_alias', value='目标类型', label='中文名称')
aliases = SystemDict.query.filter_by(dict_type='dict_type_alias').all()
alias_map = {a.value: a.label for a in aliases}
# 定义类型的友好名称 (标准类型 + 别名覆盖)
standard_types = {
'ai_model': 'AI 生成模型',
'aspect_ratio': '画面比例配置',
'ai_image_size': '输出尺寸设定',
'prompt_tpl': '生图提示词模板',
'video_model': '视频生成模型',
'video_prompt': '视频提示词模板',
'dict_type_alias': '字典类型别名', # 自身配置
}
# 优先使用数据库配置的别名
standard_types.update(alias_map)
# 合并数据库中存在的其他类型
all_types = {}
for t in counts.keys():
all_types[t] = standard_types.get(t, t) # 默认为 Key
return jsonify({
"types": [{
"type": t,
"name": name,
"count": counts.get(t, 0)
} for t, name in all_types.items()]
})
@admin_bp.route('/dicts', methods=['GET'])
@permission_required('manage_dicts')
def get_dicts():
dict_type = request.args.get('type')
query = SystemDict.query
if dict_type:
query = query.filter_by(dict_type=dict_type)
dicts = query.order_by(SystemDict.dict_type, SystemDict.sort_order.desc()).all()
return jsonify({
"dicts": [{
"id": d.id,
"dict_type": d.dict_type,
"label": d.label,
"value": d.value,
"cost": d.cost,
"is_active": d.is_active,
"sort_order": d.sort_order
} for d in dicts]
})
@admin_bp.route('/dicts', methods=['POST'])
@permission_required('manage_dicts')
def save_dict():
data = request.json
dict_id = data.get('id')
if dict_id:
d = SystemDict.query.get(dict_id)
if not d: return jsonify({"error": "记录不存在"}), 404
action = "修改"
else:
d = SystemDict()
db.session.add(d)
action = "创建"
d.dict_type = data['dict_type']
d.label = data['label']
d.value = data['value']
d.cost = data.get('cost', 0)
d.is_active = data.get('is_active', True)
d.sort_order = data.get('sort_order', 0)
db.session.commit()
system_logger.info(f"管理员{action}系统配置: {d.label}")
return jsonify({"message": "保存成功"})
@admin_bp.route('/dicts/delete', methods=['POST'])
@permission_required('manage_dicts')
def delete_dict():
data = request.json
d = SystemDict.query.get(data.get('id'))
if d:
label = d.label
db.session.delete(d)
db.session.commit()
system_logger.info(f"管理员删除系统配置: {label}")
return jsonify({"message": "删除成功"})
return jsonify({"error": "记录不存在"}), 404
# --- 通知管理 ---
@admin_bp.route('/notifications', methods=['GET'])
@permission_required('manage_notifications')
def get_notifications():
notifs = SystemNotification.query.order_by(SystemNotification.created_at.desc()).all()
return jsonify({
"notifications": [{
"id": n.id,
"title": n.title,
"content": n.content,
"is_active": n.is_active,
"created_at": n.created_at_bj.strftime('%Y-%m-%d %H:%M')
} for n in notifs]
})
@admin_bp.route('/notifications', methods=['POST'])
@permission_required('manage_notifications')
def save_notification():
data = request.json
notif_id = data.get('id')
if notif_id:
n = SystemNotification.query.get(notif_id)
if not n: return jsonify({"error": "通知不存在"}), 404
action = "修改"
else:
n = SystemNotification()
db.session.add(n)
action = "发布"
n.title = data['title']
n.content = data['content']
n.is_active = data.get('is_active', True)
db.session.commit()
system_logger.info(f"管理员{action}通知: {n.title}")
return jsonify({"message": "通知保存成功"})
@admin_bp.route('/notifications/delete', methods=['POST'])
@permission_required('manage_notifications')
def delete_notification():
data = request.json
n = SystemNotification.query.get(data.get('id'))
if n:
title = n.title
db.session.delete(n)
db.session.commit()
system_logger.info(f"管理员删除通知: {title}")
return jsonify({"message": "通知删除成功"})
return jsonify({"error": "通知不存在"}), 404
# --- 订单管理 ---
@admin_bp.route('/orders', methods=['GET'])
@permission_required('manage_system') # 仅限超级管理员
def get_orders():
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
# 过滤掉超过 30 分钟未支付的订单
orders = Order.query.filter(
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"user_phone": o.user.phone if o.user else "未知",
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
@admin_bp.route('/orders/<int:order_id>', methods=['GET'])
@login_required
def get_order_detail(order_id):
order = db.session.get(Order, order_id)
if not order:
return jsonify({"error": "订单不存在"}), 404
# 权限检查:必须是管理员或者订单所有者
if not g.user.has_permission('manage_system') and order.user_id != g.user.id:
return jsonify({"error": "无权访问此订单"}), 403
user = order.user
return jsonify({
"order": {
"id": order.id,
"out_trade_no": order.out_trade_no,
"trade_no": order.trade_no,
"amount": float(order.amount),
"points": order.points,
"status": order.status,
"created_at": order.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
},
"buyer": {
"id": user.id,
"phone": user.phone,
"current_points": user.points,
"created_at": user.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"is_banned": user.is_banned,
"role": user.role.name if user.role else "普通用户"
},
"current_is_admin": g.user.has_permission('manage_system')
})
# --- 积分发放管理 ---
@admin_bp.route('/points/grant', methods=['POST'])
@permission_required('manage_system')
def grant_points():
"""给用户发放积分"""
from flask import session
data = request.json
user_id = data.get('user_id')
points = data.get('points')
reason = data.get('reason', '管理员手动发放')
if not user_id or not points:
return jsonify({"error": "请提供用户ID和积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "用户不存在"}), 404
admin_id = session.get('user_id')
# 发放积分
user.points += points
# 记录发放记录
grant = PointsGrant(
user_id=user_id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
db.session.commit()
system_logger.info(f"管理员发放积分", target_user_id=user_id, points=points, reason=reason)
return jsonify({"message": f"成功发放 {points} 积分给用户 {user.phone}"})
@admin_bp.route('/points/batch_grant', methods=['POST'])
@permission_required('manage_system')
def batch_grant_points():
"""批量给用户发放积分"""
from flask import session
data = request.json
phones_str = data.get('phones', '')
points = data.get('points')
reason = data.get('reason', '管理员批量发放')
if not phones_str or not points:
return jsonify({"error": "请提供手机号列表和积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
# 处理手机号列表(支持换行、逗号、空格分隔)
import re
phone_list = list(set(re.split(r'[,\n\s]+', phones_str.strip())))
phone_list = [p for p in phone_list if re.match(r'^1[3-9]\d{9}$', p)]
if not phone_list:
return jsonify({"error": "未识别到有效的手机号"}), 400
admin_id = session.get('user_id')
success_count = 0
fail_phones = []
for phone in phone_list:
user = User.query.filter_by(phone=phone).first()
if user:
user.points += points
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
success_count += 1
else:
fail_phones.append(phone)
db.session.commit()
msg = f"操作完成。成功: {success_count}"
if fail_phones:
msg += f",失败: {len(fail_phones)} 人 (用户不存在)"
system_logger.info(f"管理员批量发放积分", count=success_count, points=points)
return jsonify({
"message": msg,
"success_count": success_count,
"fail_count": len(fail_phones),
"fail_phones": fail_phones
})
@admin_bp.route('/points/batch_grant_ids', methods=['POST'])
@permission_required('manage_system')
def batch_grant_by_ids():
"""通过用户ID列表批量发放积分"""
from flask import session
data = request.json
user_ids = data.get('user_ids', [])
points = data.get('points')
reason = data.get('reason', '管理员批量发放')
if not user_ids or not points:
return jsonify({"error": "请选择用户并提供积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
admin_id = session.get('user_id')
success_count = 0
for uid in user_ids:
user = db.session.get(User, uid)
if user:
user.points += points
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
success_count += 1
db.session.commit()
system_logger.info(f"管理员批量发放积分(按ID)", count=success_count, points=points)
return jsonify({"message": f"成功为 {success_count} 名用户发放了积分"})
@admin_bp.route('/points/global_grant', methods=['POST'])
@permission_required('manage_system')
def global_grant_points():
"""给全员发放积分"""
from flask import session
data = request.json
points = data.get('points')
reason = data.get('reason', '全员普惠')
if not points:
return jsonify({"error": "请提供积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
admin_id = session.get('user_id')
# 获取所有用户
users = User.query.all()
count = len(users)
for user in users:
user.points += points
# 记录每人的发放明细(虽然量大,但为了对账建议记录;若用户量极大可优化为汇总记录)
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
db.session.commit()
system_logger.info(f"管理员执行全员发放", user_count=count, points=points)
return jsonify({"message": f"成功为全站 {count} 名用户每人发放了 {points} 积分!"})
@admin_bp.route('/points/grants', methods=['GET'])
@permission_required('manage_system')
def get_points_grants():
"""获取积分发放记录"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q', '').strip()
query = PointsGrant.query.join(User, PointsGrant.user_id == User.id)
if search:
query = query.filter(User.phone.like(f"%{search}%"))
pagination = query.order_by(PointsGrant.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"grants": [{
"id": g.id,
"user_id": g.user_id,
"user_phone": g.user.phone if g.user else "未知",
"points": g.points,
"reason": g.reason,
"admin_phone": g.admin.phone if g.admin else "系统",
"created_at": g.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for g in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/invite/rewards', methods=['GET'])
@permission_required('manage_system')
def get_invite_rewards():
"""获取邀请奖励记录"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q', '').strip()
# 创建别名以处理多个关联
from sqlalchemy.orm import aliased
Inviter = aliased(User)
Invitee = aliased(User)
query = db.session.query(InviteReward).join(Inviter, InviteReward.inviter_id == Inviter.id).join(Invitee, InviteReward.invitee_id == Invitee.id)
if search:
query = query.filter(db.or_(
Inviter.phone.like(f"%{search}%"),
Invitee.phone.like(f"%{search}%")
))
pagination = query.order_by(InviteReward.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"rewards": [{
"id": r.id,
"inviter_id": r.inviter_id,
"inviter_phone": r.inviter.phone if r.inviter else "未知",
"invitee_id": r.invitee_id,
"invitee_phone": r.invitee.phone if r.invitee else "未知",
"reward_points": r.reward_points,
"recharge_count": r.recharge_count,
"created_at": r.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for r in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/invite/stats', methods=['GET'])
@permission_required('manage_system')
def get_invite_stats():
"""获取邀请统计数据"""
# 总邀请人数
total_invited = User.query.filter(User.invited_by.isnot(None)).count()
# 总发放邀请奖励
total_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).scalar() or 0
# 总手动发放积分
total_grants = db.session.query(db.func.sum(PointsGrant.points)).scalar() or 0
# 邀请排行榜前10
from sqlalchemy.orm import aliased
Inviter = aliased(User)
Invitee = aliased(User)
top_inviters = db.session.query(
Inviter.id,
Inviter.phone,
db.func.count(Invitee.id).label('invite_count')
).join(Invitee, Invitee.invited_by == Inviter.id)\
.group_by(Inviter.id, Inviter.phone)\
.order_by(db.desc('invite_count'))\
.limit(10).all()
return jsonify({
"total_invited": total_invited,
"total_rewards": int(total_rewards),
"total_grants": int(total_grants),
"top_inviters": [{
"user_id": t[0],
"phone": t[1],
"invite_count": t[2]
} for t in top_inviters] if top_inviters else []
})