ai_v/blueprints/admin.py

302 lines
10 KiB
Python

from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from extensions import db
from models import User, Role, Permission, SystemDict, SystemNotification, Order, to_bj_time
from middlewares.auth import permission_required
from services.logger import system_logger
admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
# --- 角色管理 ---
@admin_bp.route('/roles', methods=['GET'])
@permission_required('manage_rbac')
def get_roles():
roles = Role.query.order_by(Role.id).all()
return jsonify({
"roles": [{
"id": r.id,
"name": r.name,
"description": r.description,
"permissions": [p.name for p in r.permissions]
} for r in roles]
})
@admin_bp.route('/roles', methods=['POST'])
@permission_required('manage_rbac')
def save_role():
data = request.json
role_id = data.get('id')
if role_id:
role = Role.query.get(role_id)
if not role: return jsonify({"error": "角色不存在"}), 404
role.name = data['name']
role.description = data.get('description')
system_logger.info(f"管理员修改角色: {role.name}")
else:
role = Role(name=data['name'], description=data.get('description'))
db.session.add(role)
system_logger.info(f"管理员创建角色: {role.name}")
if 'permissions' in data:
perms = Permission.query.filter(Permission.name.in_(data['permissions'])).all()
role.permissions = perms
db.session.commit()
return jsonify({"message": "角色保存成功"})
@admin_bp.route('/roles/delete', methods=['POST'])
@permission_required('manage_rbac')
def delete_role():
data = request.json
role = Role.query.get(data.get('id'))
if role:
if role.name == '超级管理员':
return jsonify({"error": "不能删除超级管理员角色"}), 400
role_name = role.name
db.session.delete(role)
db.session.commit()
system_logger.info(f"管理员删除角色: {role_name}")
return jsonify({"message": "角色删除成功"})
return jsonify({"error": "角色不存在"}), 404
# --- 权限管理 ---
@admin_bp.route('/permissions', methods=['GET'])
@permission_required('manage_rbac')
def get_permissions():
perms = Permission.query.order_by(Permission.id).all()
return jsonify({
"permissions": [{"name": p.name, "description": p.description} for p in perms]
})
# --- 用户角色分配 ---
@admin_bp.route('/users', methods=['GET'])
@permission_required('manage_users')
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('q') # 搜索关键字 (手机号)
query = User.query
if search:
query = query.filter(User.phone.like(f"%{search}%"))
pagination = query.order_by(User.id.asc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"users": [{
"id": u.id,
"phone": u.phone,
"role": u.role.name if u.role else "未分配",
"role_id": u.role.id if u.role else None,
"is_banned": u.is_banned
} for u in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": pagination.page
})
@admin_bp.route('/users/assign', methods=['POST'])
@permission_required('manage_users')
def assign_role():
data = request.json
user = db.session.get(User, data['user_id'])
role = db.session.get(Role, data['role_id'])
if user and role:
user.role = role
db.session.commit()
system_logger.info(f"管理员分配用户角色", user_phone=user.phone, role_name=role.name)
return jsonify({"message": "角色分配成功"})
return jsonify({"error": "用户或角色不存在"}), 404
@admin_bp.route('/users/toggle_ban', methods=['POST'])
@permission_required('manage_users')
def toggle_ban():
data = request.json
user = db.session.get(User, data['user_id'])
if user:
if user.role and user.role.name == '超级管理员':
return jsonify({"error": "不能封禁超级管理员"}), 400
user.is_banned = not user.is_banned
db.session.commit()
status = "封禁" if user.is_banned else "解封"
system_logger.warning(f"管理员{status}了用户: {user.phone}")
return jsonify({"message": f"用户已{status}"})
return jsonify({"error": "用户不存在"}), 404
# --- 字典管理 ---
@admin_bp.route('/dict_types', methods=['GET'])
@permission_required('manage_dicts')
def get_dict_types():
# 获取唯一的字典类型及其记录数
counts = dict(db.session.query(SystemDict.dict_type, db.func.count(SystemDict.id))\
.group_by(SystemDict.dict_type).all())
# 获取类型别名配置 (dict_type='dict_type_alias', value='目标类型', label='中文名称')
aliases = SystemDict.query.filter_by(dict_type='dict_type_alias').all()
alias_map = {a.value: a.label for a in aliases}
# 定义类型的友好名称 (标准类型 + 别名覆盖)
standard_types = {
'ai_model': 'AI 生成模型',
'aspect_ratio': '画面比例配置',
'ai_image_size': '输出尺寸设定',
'prompt_tpl': '生图提示词模板',
'video_model': '视频生成模型',
'video_prompt': '视频提示词模板',
'dict_type_alias': '字典类型别名', # 自身配置
}
# 优先使用数据库配置的别名
standard_types.update(alias_map)
# 合并数据库中存在的其他类型
all_types = {}
for t in counts.keys():
all_types[t] = standard_types.get(t, t) # 默认为 Key
return jsonify({
"types": [{
"type": t,
"name": name,
"count": counts.get(t, 0)
} for t, name in all_types.items()]
})
@admin_bp.route('/dicts', methods=['GET'])
@permission_required('manage_dicts')
def get_dicts():
dict_type = request.args.get('type')
query = SystemDict.query
if dict_type:
query = query.filter_by(dict_type=dict_type)
dicts = query.order_by(SystemDict.dict_type, SystemDict.sort_order.desc()).all()
return jsonify({
"dicts": [{
"id": d.id,
"dict_type": d.dict_type,
"label": d.label,
"value": d.value,
"cost": d.cost,
"is_active": d.is_active,
"sort_order": d.sort_order
} for d in dicts]
})
@admin_bp.route('/dicts', methods=['POST'])
@permission_required('manage_dicts')
def save_dict():
data = request.json
dict_id = data.get('id')
if dict_id:
d = SystemDict.query.get(dict_id)
if not d: return jsonify({"error": "记录不存在"}), 404
action = "修改"
else:
d = SystemDict()
db.session.add(d)
action = "创建"
d.dict_type = data['dict_type']
d.label = data['label']
d.value = data['value']
d.cost = data.get('cost', 0)
d.is_active = data.get('is_active', True)
d.sort_order = data.get('sort_order', 0)
db.session.commit()
system_logger.info(f"管理员{action}系统配置: {d.label}")
return jsonify({"message": "保存成功"})
@admin_bp.route('/dicts/delete', methods=['POST'])
@permission_required('manage_dicts')
def delete_dict():
data = request.json
d = SystemDict.query.get(data.get('id'))
if d:
label = d.label
db.session.delete(d)
db.session.commit()
system_logger.info(f"管理员删除系统配置: {label}")
return jsonify({"message": "删除成功"})
return jsonify({"error": "记录不存在"}), 404
# --- 通知管理 ---
@admin_bp.route('/notifications', methods=['GET'])
@permission_required('manage_notifications')
def get_notifications():
notifs = SystemNotification.query.order_by(SystemNotification.created_at.desc()).all()
return jsonify({
"notifications": [{
"id": n.id,
"title": n.title,
"content": n.content,
"is_active": n.is_active,
"created_at": n.created_at_bj.strftime('%Y-%m-%d %H:%M')
} for n in notifs]
})
@admin_bp.route('/notifications', methods=['POST'])
@permission_required('manage_notifications')
def save_notification():
data = request.json
notif_id = data.get('id')
if notif_id:
n = SystemNotification.query.get(notif_id)
if not n: return jsonify({"error": "通知不存在"}), 404
action = "修改"
else:
n = SystemNotification()
db.session.add(n)
action = "发布"
n.title = data['title']
n.content = data['content']
n.is_active = data.get('is_active', True)
db.session.commit()
system_logger.info(f"管理员{action}通知: {n.title}")
return jsonify({"message": "通知保存成功"})
@admin_bp.route('/notifications/delete', methods=['POST'])
@permission_required('manage_notifications')
def delete_notification():
data = request.json
n = SystemNotification.query.get(data.get('id'))
if n:
title = n.title
db.session.delete(n)
db.session.commit()
system_logger.info(f"管理员删除通知: {title}")
return jsonify({"message": "通知删除成功"})
return jsonify({"error": "通知不存在"}), 404
# --- 订单管理 ---
@admin_bp.route('/orders', methods=['GET'])
@permission_required('manage_system') # 仅限超级管理员
def get_orders():
thirty_min_ago = datetime.utcnow() - timedelta(minutes=30)
# 过滤掉超过 30 分钟未支付的订单
orders = Order.query.filter(
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"user_phone": o.user.phone if o.user else "未知",
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})