ai_v/blueprints/admin.py
24024 a092cdfb4c ```
fix(admin): 修复邀请排行榜查询逻辑错误

- 引入SQLAlchemy别名机制区分邀请人和被邀请人表
- 修正JOIN查询中表关联关系避免自关联问题
- 优化GROUP BY子句包含必要的字段确保数据准确性
- 调整查询结构提升代码可读性和维护性
```
2026-01-23 22:09:45 +08:00

602 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify
from datetime import timedelta
from extensions import db
from models import User, Role, Permission, SystemDict, SystemNotification, Order, PointsGrant, InviteReward, to_bj_time, get_bj_now
from middlewares.auth import permission_required
from services.logger import system_logger
admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
# --- 角色管理 ---
@admin_bp.route('/roles', methods=['GET'])
@permission_required('manage_rbac')
def get_roles():
roles = Role.query.order_by(Role.id).all()
return jsonify({
"roles": [{
"id": r.id,
"name": r.name,
"description": r.description,
"permissions": [p.name for p in r.permissions]
} for r in roles]
})
@admin_bp.route('/roles', methods=['POST'])
@permission_required('manage_rbac')
def save_role():
data = request.json
role_id = data.get('id')
if role_id:
role = Role.query.get(role_id)
if not role: return jsonify({"error": "角色不存在"}), 404
role.name = data['name']
role.description = data.get('description')
system_logger.info(f"管理员修改角色: {role.name}")
else:
role = Role(name=data['name'], description=data.get('description'))
db.session.add(role)
system_logger.info(f"管理员创建角色: {role.name}")
if 'permissions' in data:
perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all()
role.permissions = perms
db.session.commit()
return jsonify({"message": "角色保存成功"})
@admin_bp.route('/roles/delete', methods=['POST'])
@permission_required('manage_rbac')
def delete_role():
data = request.json
role = Role.query.get(data.get('id'))
if role:
if role.name == '超级管理员':
return jsonify({"error": "不能删除超级管理员角色"}), 400
role_name = role.name
db.session.delete(role)
db.session.commit()
system_logger.info(f"管理员删除角色: {role_name}")
return jsonify({"message": "角色删除成功"})
return jsonify({"error": "角色不存在"}), 404
# --- 权限管理 ---
@admin_bp.route('/permissions', methods=['GET'])
@permission_required('manage_rbac')
def get_permissions():
perms = Permission.query.order_by(Permission.id).all()
return jsonify({
"permissions": [{"name": p.name, "description": p.description} for p in perms]
})
# --- 用户角色分配 ---
@admin_bp.route('/users', methods=['GET'])
@permission_required('manage_users')
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q') # 搜索关键字 (手机号)
query = User.query
if search:
query = query.filter(User.phone.like(f"%{search}%"))
pagination = query.order_by(User.id.asc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"users": [{
"id": u.id,
"phone": u.phone,
"role": u.role.name if u.role else "未分配",
"role_id": u.role.id if u.role else None,
"is_banned": u.is_banned,
"points": u.points,
"created_at": u.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for u in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/users/assign', methods=['POST'])
@permission_required('manage_users')
def assign_role():
data = request.json
user = db.session.get(User, data['user_id'])
role = db.session.get(Role, data['role_id'])
if user and role:
user.role = role
db.session.commit()
system_logger.info(f"管理员分配用户角色", user_phone=user.phone, role_name=role.name)
return jsonify({"message": "角色分配成功"})
return jsonify({"error": "用户或角色不存在"}), 404
@admin_bp.route('/users/toggle_ban', methods=['POST'])
@permission_required('manage_users')
def toggle_ban():
data = request.json
user = db.session.get(User, data['user_id'])
if user:
if user.role and user.role.name == '超级管理员':
return jsonify({"error": "不能封禁超级管理员"}), 400
user.is_banned = not user.is_banned
db.session.commit()
status = "封禁" if user.is_banned else "解封"
system_logger.warning(f"管理员{status}了用户: {user.phone}")
return jsonify({"message": f"用户已{status}"})
return jsonify({"error": "用户不存在"}), 404
# --- 字典管理 ---
@admin_bp.route('/dict_types', methods=['GET'])
@permission_required('manage_dicts')
def get_dict_types():
# 获取唯一的字典类型及其记录数
counts = dict(db.session.query(SystemDict.dict_type, db.func.count(SystemDict.id))\
.group_by(SystemDict.dict_type).all())
# 获取类型别名配置 (dict_type='dict_type_alias', value='目标类型', label='中文名称')
aliases = SystemDict.query.filter_by(dict_type='dict_type_alias').all()
alias_map = {a.value: a.label for a in aliases}
# 定义类型的友好名称 (标准类型 + 别名覆盖)
standard_types = {
'ai_model': 'AI 生成模型',
'aspect_ratio': '画面比例配置',
'ai_image_size': '输出尺寸设定',
'prompt_tpl': '生图提示词模板',
'video_model': '视频生成模型',
'video_prompt': '视频提示词模板',
'dict_type_alias': '字典类型别名', # 自身配置
}
# 优先使用数据库配置的别名
standard_types.update(alias_map)
# 合并数据库中存在的其他类型
all_types = {}
for t in counts.keys():
all_types[t] = standard_types.get(t, t) # 默认为 Key
return jsonify({
"types": [{
"type": t,
"name": name,
"count": counts.get(t, 0)
} for t, name in all_types.items()]
})
@admin_bp.route('/dicts', methods=['GET'])
@permission_required('manage_dicts')
def get_dicts():
dict_type = request.args.get('type')
query = SystemDict.query
if dict_type:
query = query.filter_by(dict_type=dict_type)
dicts = query.order_by(SystemDict.dict_type, SystemDict.sort_order.desc()).all()
return jsonify({
"dicts": [{
"id": d.id,
"dict_type": d.dict_type,
"label": d.label,
"value": d.value,
"cost": d.cost,
"is_active": d.is_active,
"sort_order": d.sort_order
} for d in dicts]
})
@admin_bp.route('/dicts', methods=['POST'])
@permission_required('manage_dicts')
def save_dict():
data = request.json
dict_id = data.get('id')
if dict_id:
d = SystemDict.query.get(dict_id)
if not d: return jsonify({"error": "记录不存在"}), 404
action = "修改"
else:
d = SystemDict()
db.session.add(d)
action = "创建"
d.dict_type = data['dict_type']
d.label = data['label']
d.value = data['value']
d.cost = data.get('cost', 0)
d.is_active = data.get('is_active', True)
d.sort_order = data.get('sort_order', 0)
db.session.commit()
system_logger.info(f"管理员{action}系统配置: {d.label}")
return jsonify({"message": "保存成功"})
@admin_bp.route('/dicts/delete', methods=['POST'])
@permission_required('manage_dicts')
def delete_dict():
data = request.json
d = SystemDict.query.get(data.get('id'))
if d:
label = d.label
db.session.delete(d)
db.session.commit()
system_logger.info(f"管理员删除系统配置: {label}")
return jsonify({"message": "删除成功"})
return jsonify({"error": "记录不存在"}), 404
# --- 通知管理 ---
@admin_bp.route('/notifications', methods=['GET'])
@permission_required('manage_notifications')
def get_notifications():
notifs = SystemNotification.query.order_by(SystemNotification.created_at.desc()).all()
return jsonify({
"notifications": [{
"id": n.id,
"title": n.title,
"content": n.content,
"is_active": n.is_active,
"created_at": n.created_at_bj.strftime('%Y-%m-%d %H:%M')
} for n in notifs]
})
@admin_bp.route('/notifications', methods=['POST'])
@permission_required('manage_notifications')
def save_notification():
data = request.json
notif_id = data.get('id')
if notif_id:
n = SystemNotification.query.get(notif_id)
if not n: return jsonify({"error": "通知不存在"}), 404
action = "修改"
else:
n = SystemNotification()
db.session.add(n)
action = "发布"
n.title = data['title']
n.content = data['content']
n.is_active = data.get('is_active', True)
db.session.commit()
system_logger.info(f"管理员{action}通知: {n.title}")
return jsonify({"message": "通知保存成功"})
@admin_bp.route('/notifications/delete', methods=['POST'])
@permission_required('manage_notifications')
def delete_notification():
data = request.json
n = SystemNotification.query.get(data.get('id'))
if n:
title = n.title
db.session.delete(n)
db.session.commit()
system_logger.info(f"管理员删除通知: {title}")
return jsonify({"message": "通知删除成功"})
return jsonify({"error": "通知不存在"}), 404
# --- 订单管理 ---
@admin_bp.route('/orders', methods=['GET'])
@permission_required('manage_system') # 仅限超级管理员
def get_orders():
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
# 过滤掉超过 30 分钟未支付的订单
orders = Order.query.filter(
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"user_phone": o.user.phone if o.user else "未知",
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
# --- 积分发放管理 ---
@admin_bp.route('/points/grant', methods=['POST'])
@permission_required('manage_system')
def grant_points():
"""给用户发放积分"""
from flask import session
data = request.json
user_id = data.get('user_id')
points = data.get('points')
reason = data.get('reason', '管理员手动发放')
if not user_id or not points:
return jsonify({"error": "请提供用户ID和积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "用户不存在"}), 404
admin_id = session.get('user_id')
# 发放积分
user.points += points
# 记录发放记录
grant = PointsGrant(
user_id=user_id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
db.session.commit()
system_logger.info(f"管理员发放积分", target_user_id=user_id, points=points, reason=reason)
return jsonify({"message": f"成功发放 {points} 积分给用户 {user.phone}"})
@admin_bp.route('/points/batch_grant', methods=['POST'])
@permission_required('manage_system')
def batch_grant_points():
"""批量给用户发放积分"""
from flask import session
data = request.json
phones_str = data.get('phones', '')
points = data.get('points')
reason = data.get('reason', '管理员批量发放')
if not phones_str or not points:
return jsonify({"error": "请提供手机号列表和积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
# 处理手机号列表(支持换行、逗号、空格分隔)
import re
phone_list = list(set(re.split(r'[,\n\s]+', phones_str.strip())))
phone_list = [p for p in phone_list if re.match(r'^1[3-9]\d{9}$', p)]
if not phone_list:
return jsonify({"error": "未识别到有效的手机号"}), 400
admin_id = session.get('user_id')
success_count = 0
fail_phones = []
for phone in phone_list:
user = User.query.filter_by(phone=phone).first()
if user:
user.points += points
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
success_count += 1
else:
fail_phones.append(phone)
db.session.commit()
msg = f"操作完成。成功: {success_count}"
if fail_phones:
msg += f",失败: {len(fail_phones)} 人 (用户不存在)"
system_logger.info(f"管理员批量发放积分", count=success_count, points=points)
return jsonify({
"message": msg,
"success_count": success_count,
"fail_count": len(fail_phones),
"fail_phones": fail_phones
})
@admin_bp.route('/points/batch_grant_ids', methods=['POST'])
@permission_required('manage_system')
def batch_grant_by_ids():
"""通过用户ID列表批量发放积分"""
from flask import session
data = request.json
user_ids = data.get('user_ids', [])
points = data.get('points')
reason = data.get('reason', '管理员批量发放')
if not user_ids or not points:
return jsonify({"error": "请选择用户并提供积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
admin_id = session.get('user_id')
success_count = 0
for uid in user_ids:
user = db.session.get(User, uid)
if user:
user.points += points
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
success_count += 1
db.session.commit()
system_logger.info(f"管理员批量发放积分(按ID)", count=success_count, points=points)
return jsonify({"message": f"成功为 {success_count} 名用户发放了积分"})
@admin_bp.route('/points/global_grant', methods=['POST'])
@permission_required('manage_system')
def global_grant_points():
"""给全员发放积分"""
from flask import session
data = request.json
points = data.get('points')
reason = data.get('reason', '全员普惠')
if not points:
return jsonify({"error": "请提供积分数"}), 400
try:
points = int(points)
if points <= 0:
return jsonify({"error": "积分数必须为正整数"}), 400
except ValueError:
return jsonify({"error": "积分数必须为整数"}), 400
admin_id = session.get('user_id')
# 获取所有用户
users = User.query.all()
count = len(users)
for user in users:
user.points += points
# 记录每人的发放明细(虽然量大,但为了对账建议记录;若用户量极大可优化为汇总记录)
grant = PointsGrant(
user_id=user.id,
points=points,
reason=reason,
admin_id=admin_id
)
db.session.add(grant)
db.session.commit()
system_logger.info(f"管理员执行全员发放", user_count=count, points=points)
return jsonify({"message": f"成功为全站 {count} 名用户每人发放了 {points} 积分!"})
@admin_bp.route('/points/grants', methods=['GET'])
@permission_required('manage_system')
def get_points_grants():
"""获取积分发放记录"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q', '').strip()
query = PointsGrant.query.join(User, PointsGrant.user_id == User.id)
if search:
query = query.filter(User.phone.like(f"%{search}%"))
pagination = query.order_by(PointsGrant.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"grants": [{
"id": g.id,
"user_id": g.user_id,
"user_phone": g.user.phone if g.user else "未知",
"points": g.points,
"reason": g.reason,
"admin_phone": g.admin.phone if g.admin else "系统",
"created_at": g.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for g in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/invite/rewards', methods=['GET'])
@permission_required('manage_system')
def get_invite_rewards():
"""获取邀请奖励记录"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q', '').strip()
# 创建别名以处理多个关联
from sqlalchemy.orm import aliased
Inviter = aliased(User)
Invitee = aliased(User)
query = db.session.query(InviteReward).join(Inviter, InviteReward.inviter_id == Inviter.id).join(Invitee, InviteReward.invitee_id == Invitee.id)
if search:
query = query.filter(db.or_(
Inviter.phone.like(f"%{search}%"),
Invitee.phone.like(f"%{search}%")
))
pagination = query.order_by(InviteReward.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"rewards": [{
"id": r.id,
"inviter_id": r.inviter_id,
"inviter_phone": r.inviter.phone if r.inviter else "未知",
"invitee_id": r.invitee_id,
"invitee_phone": r.invitee.phone if r.invitee else "未知",
"reward_points": r.reward_points,
"recharge_count": r.recharge_count,
"created_at": r.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for r in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/invite/stats', methods=['GET'])
@permission_required('manage_system')
def get_invite_stats():
"""获取邀请统计数据"""
# 总邀请人数
total_invited = User.query.filter(User.invited_by.isnot(None)).count()
# 总发放邀请奖励
total_rewards = db.session.query(db.func.sum(InviteReward.reward_points)).scalar() or 0
# 总手动发放积分
total_grants = db.session.query(db.func.sum(PointsGrant.points)).scalar() or 0
# 邀请排行榜前10
from sqlalchemy.orm import aliased
Inviter = aliased(User)
Invitee = aliased(User)
top_inviters = db.session.query(
Inviter.id,
Inviter.phone,
db.func.count(Invitee.id).label('invite_count')
).join(Invitee, Invitee.invited_by == Inviter.id)\
.group_by(Inviter.id, Inviter.phone)\
.order_by(db.desc('invite_count'))\
.limit(10).all()
return jsonify({
"total_invited": total_invited,
"total_rewards": int(total_rewards),
"total_grants": int(total_grants),
"top_inviters": [{
"user_id": t[0],
"phone": t[1],
"invite_count": t[2]
} for t in top_inviters] if top_inviters else []
})