From f548e8d6ab3b8432446d6641ce10e890c6764638 Mon Sep 17 00:00:00 2001 From: 24024 <240241002@qq.com> Date: Sat, 17 Jan 2026 23:15:58 +0800 Subject: [PATCH] feat: Implement core application structure with new services, blueprints, templates, and database migrations. --- README.md | 70 +++ app.py | 2 +- blueprints/admin.py | 76 ++- blueprints/api.py | 578 +++--------------- blueprints/auth.py | 41 +- blueprints/payment.py | 39 +- check_alipay_config.py | 163 ----- config.py | 5 + create_database.py | 51 -- db_manager.py | 67 ++ fix_db_manual.py | 22 - fix_db_manual_points.py | 30 - force_init.py | 81 --- init_db.py | 53 -- init_dicts.py | 54 -- init_notifications.py | 30 - init_rbac.py | 58 -- middlewares/auth.py | 23 +- migrate_api_key.py | 13 - migrate_db.py | 23 - migrate_rbac.py | 29 - migrations/versions/0cc7ce54ecc0_auto_sync.py | 32 + migrations/versions/a77f97f56b61_auto_sync.py | 32 + models.py | 41 +- services/__init__.py | 0 services/file_service.py | 34 ++ services/generation_service.py | 156 +++++ services/history_service.py | 55 ++ services/logger.py | 9 +- services/stats_service.py | 65 ++ services/system_service.py | 62 ++ services/task_service.py | 285 +++++++++ static/js/main.js | 132 +++- static/js/video.js | 9 +- sync_db.py | 15 - sync_history_db.py | 10 - sync_videos_manual.py | 83 --- templates/base.html | 6 +- templates/buy.html | 222 ++++--- templates/dicts.html | 10 + templates/index.html | 100 ++- templates/logs.html | 4 +- templates/rbac.html | 560 +++++++++++------ templates/recharge_history.html | 117 ++-- test_alipay_verify.py | 165 ----- 45 files changed, 1932 insertions(+), 1780 deletions(-) create mode 100644 README.md delete mode 100644 check_alipay_config.py delete mode 100644 create_database.py create mode 100644 db_manager.py delete mode 100644 fix_db_manual.py delete mode 100644 fix_db_manual_points.py delete mode 100644 force_init.py delete mode 100644 init_db.py delete mode 100644 init_dicts.py delete mode 100644 init_notifications.py delete mode 100644 init_rbac.py delete mode 100644 migrate_api_key.py delete mode 100644 migrate_db.py delete mode 100644 migrate_rbac.py create mode 100644 migrations/versions/0cc7ce54ecc0_auto_sync.py create mode 100644 migrations/versions/a77f97f56b61_auto_sync.py create mode 100644 services/__init__.py create mode 100644 services/file_service.py create mode 100644 services/generation_service.py create mode 100644 services/history_service.py create mode 100644 services/stats_service.py create mode 100644 services/system_service.py create mode 100644 services/task_service.py delete mode 100644 sync_db.py delete mode 100644 sync_history_db.py delete mode 100644 sync_videos_manual.py delete mode 100644 test_alipay_verify.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..52f32f1 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +# AI 视界 (AI Vision) 项目指南 + +## 🚀 快速开始 + +### 1. 环境准备 +确保已安装 Python 3.8+ 和 PostgreSQL / Redis。 + +**激活虚拟环境** (推荐) +```bash +# Windows (PowerShell) +.\.venv\Scripts\Activate.ps1 + +# Linux / Mac +source .venv/bin/activate +``` + +**安装依赖** +```bash +pip install -r requirements.txt +``` + +### 2. 启动服务 +```bash +python app.py +``` +服务默认运行在 `http://127.0.0.1:5000`。 + +--- + +## 🛠️ 常用维护命令 + +### 数据库管理 (推荐) +本项目内置了自动化数据库管理工具 `db_manager.py`,用于处理模型变更和迁移。 + +**一键自动同步 (最常用)** +当您修改了 `models.py` 中的表结构后,运行此命令自动完成迁移: +```bash +python db_manager.py sync +``` + +**分步操作** +如果您需要更精细的控制: + +* **初始化环境** (仅首次): `python db_manager.py init` +* **生成迁移脚本**: `python db_manager.py make "修改说明"` +* **执行数据库变更**: `python db_manager.py up` + +### 系统配置更新 +AI 模型、提示词模板等配置已移至数据库的 `system_dicts` 表中。 +* 请登录 **Web 后台管理界面** (`/rbac`) 进行可视化的添加和修改。 +* 或直接操作数据库更新 `system_dicts` 表。 + +--- + +## 📂 目录结构说明 + +* `app.py`: 应用入口 +* `config.py`: 配置文件 +* `models.py`: 数据库模型定义 +* `blueprints/`: 路由蓝图 (API 接口) + * `api.py`: 核心业务接口 (Controller) + * `admin.py`: 后台管理接口 + * `auth.py`: 认证接口 +* `services/`: 业务逻辑层 (Service) + * `task_service.py`: 异步任务处理 (生图/视频) + * `generation_service.py`: 生成请求验证与计费 + * `system_service.py`: 系统配置与通知 + * `history_service.py`: 历史记录查询 +* `templates/`: 前端 HTML 模板 +* `static/`: 静态资源 (JS/CSS) diff --git a/app.py b/app.py index 62cfb99..7dbf858 100644 --- a/app.py +++ b/app.py @@ -63,7 +63,7 @@ def create_app(): 'path': log.path, 'method': log.method, 'user_agent': log.user_agent, - 'created_at': log.created_at.strftime('%Y-%m-%d %H:%M:%S') + 'created_at': log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S') } for log in logs]) @app.route('/') diff --git a/blueprints/admin.py b/blueprints/admin.py index cfdad57..cdddad8 100644 --- a/blueprints/admin.py +++ b/blueprints/admin.py @@ -1,6 +1,7 @@ from flask import Blueprint, request, jsonify +from datetime import datetime, timedelta from extensions import db -from models import User, Role, Permission, SystemDict, SystemNotification, Order +from models import User, Role, Permission, SystemDict, SystemNotification, Order, to_bj_time from middlewares.auth import permission_required from services.logger import system_logger @@ -10,7 +11,7 @@ admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin') @admin_bp.route('/roles', methods=['GET']) @permission_required('manage_rbac') def get_roles(): - roles = Role.query.all() + roles = Role.query.order_by(Role.id).all() return jsonify({ "roles": [{ "id": r.id, @@ -63,7 +64,7 @@ def delete_role(): @admin_bp.route('/permissions', methods=['GET']) @permission_required('manage_rbac') def get_permissions(): - perms = Permission.query.all() + perms = Permission.query.order_by(Permission.id).all() return jsonify({ "permissions": [{"name": p.name, "description": p.description} for p in perms] }) @@ -72,13 +73,29 @@ def get_permissions(): @admin_bp.route('/users', methods=['GET']) @permission_required('manage_users') def get_users(): - users = User.query.all() + page = request.args.get('page', 1, type=int) + per_page = request.args.get('per_page', 20, type=int) + search = request.args.get('q') # 搜索关键字 (手机号) + + query = User.query + if search: + query = query.filter(User.phone.like(f"%{search}%")) + + pagination = query.order_by(User.id.asc()).paginate( + page=page, per_page=per_page, error_out=False + ) + return jsonify({ "users": [{ "id": u.id, "phone": u.phone, - "role": u.role.name if u.role else "未分配" - } for u in users] + "role": u.role.name if u.role else "未分配", + "role_id": u.role.id if u.role else None, + "is_banned": u.is_banned + } for u in pagination.items], + "total": pagination.total, + "pages": pagination.pages, + "current_page": pagination.page }) @admin_bp.route('/users/assign', methods=['POST']) @@ -94,6 +111,21 @@ def assign_role(): return jsonify({"message": "角色分配成功"}) return jsonify({"error": "用户或角色不存在"}), 404 +@admin_bp.route('/users/toggle_ban', methods=['POST']) +@permission_required('manage_users') +def toggle_ban(): + data = request.json + user = db.session.get(User, data['user_id']) + if user: + if user.role and user.role.name == '超级管理员': + return jsonify({"error": "不能封禁超级管理员"}), 400 + user.is_banned = not user.is_banned + db.session.commit() + status = "封禁" if user.is_banned else "解封" + system_logger.warning(f"管理员{status}了用户: {user.phone}") + return jsonify({"message": f"用户已{status}"}) + return jsonify({"error": "用户不存在"}), 404 + # --- 字典管理 --- @admin_bp.route('/dict_types', methods=['GET']) @permission_required('manage_dicts') @@ -102,7 +134,11 @@ def get_dict_types(): counts = dict(db.session.query(SystemDict.dict_type, db.func.count(SystemDict.id))\ .group_by(SystemDict.dict_type).all()) - # 定义类型的友好名称 (标准类型) + # 获取类型别名配置 (dict_type='dict_type_alias', value='目标类型', label='中文名称') + aliases = SystemDict.query.filter_by(dict_type='dict_type_alias').all() + alias_map = {a.value: a.label for a in aliases} + + # 定义类型的友好名称 (标准类型 + 别名覆盖) standard_types = { 'ai_model': 'AI 生成模型', 'aspect_ratio': '画面比例配置', @@ -110,13 +146,15 @@ def get_dict_types(): 'prompt_tpl': '生图提示词模板', 'video_model': '视频生成模型', 'video_prompt': '视频提示词模板', + 'dict_type_alias': '字典类型别名', # 自身配置 } + # 优先使用数据库配置的别名 + standard_types.update(alias_map) # 合并数据库中存在的其他类型 - all_types = {**standard_types} + all_types = {} for t in counts.keys(): - if t not in all_types: - all_types[t] = t # 未知类型直接使用 Key 作为名称 + all_types[t] = standard_types.get(t, t) # 默认为 Key return jsonify({ "types": [{ @@ -195,7 +233,7 @@ def get_notifications(): "title": n.title, "content": n.content, "is_active": n.is_active, - "created_at": n.created_at.strftime('%Y-%m-%d %H:%M') + "created_at": n.created_at_bj.strftime('%Y-%m-%d %H:%M') } for n in notifs] }) @@ -239,8 +277,16 @@ def delete_notification(): @admin_bp.route('/orders', methods=['GET']) @permission_required('manage_system') # 仅限超级管理员 def get_orders(): - orders = Order.query.order_by(Order.created_at.desc()).all() - from datetime import timedelta + thirty_min_ago = datetime.utcnow() - timedelta(minutes=30) + + # 过滤掉超过 30 分钟未支付的订单 + orders = Order.query.filter( + db.or_( + Order.status == 'PAID', + db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) + ) + ).order_by(Order.created_at.desc()).all() + return jsonify({ "orders": [{ "id": o.id, @@ -250,7 +296,7 @@ def get_orders(): "points": o.points, "status": o.status, "trade_no": o.trade_no, - "created_at": (o.created_at + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'), - "paid_at": (o.paid_at + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None + "created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), + "paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None } for o in orders] }) \ No newline at end of file diff --git a/blueprints/api.py b/blueprints/api.py index 250ec96..b556c52 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -1,289 +1,21 @@ -import os -import uuid -import json -import requests -import io -import threading -import time -import base64 from flask import Blueprint, request, jsonify, session, current_app -from urllib.parse import quote -from config import Config -from extensions import s3_client, redis_client, db -from models import GenerationRecord, User, SystemDict, SystemNotification +from extensions import db, redis_client +from models import User from middlewares.auth import login_required from services.logger import system_logger +import json + +# Import Services +from services.system_service import get_system_config_data, get_user_latest_notification, mark_notification_as_read +from services.history_service import get_user_history_data +from services.file_service import handle_file_uploads, get_remote_file_stream +from services.generation_service import ( + validate_generation_request, deduct_points, handle_chat_generation_sync, + start_async_image_task, validate_video_request, start_async_video_task +) api_bp = Blueprint('api', __name__) -def sync_images_background(app, record_id, raw_urls): - """后台同步图片至 MinIO,并生成缩略图,带重试机制""" - with app.app_context(): - processed_data = [] - for raw_url in raw_urls: - success = False - for attempt in range(3): # 3 次重试机制 - try: - img_resp = requests.get(raw_url, timeout=30) - if img_resp.status_code == 200: - content = img_resp.content - ext = ".png" - base_filename = f"gen-{uuid.uuid4().hex}" - full_filename = f"{base_filename}{ext}" - thumb_filename = f"{base_filename}-thumb{ext}" - - # 1. 上传原图 - s3_client.upload_fileobj( - io.BytesIO(content), - Config.MINIO["bucket"], - full_filename, - ExtraArgs={"ContentType": "image/png"} - ) - - full_url = f"{Config.MINIO['public_url']}{quote(full_filename)}" - thumb_url = full_url # 默认使用原图 - - # 2. 生成并上传缩略图 (400px 宽度) - try: - from PIL import Image - img = Image.open(io.BytesIO(content)) - # 转换为 RGB 如果是 RGBA (避免某些格式保存问题) - if img.mode in ("RGBA", "P"): - img = img.convert("RGB") - - # 缩放至宽度 400, 高度等比 - w, h = img.size - if w > 400: - ratio = 400 / float(w) - img.thumbnail((400, int(h * ratio)), Image.Resampling.LANCZOS) - - thumb_io = io.BytesIO() - # 缩略图保存为 JPEG 以获得更小的体积 - img.save(thumb_io, format='JPEG', quality=80, optimize=True) - thumb_io.seek(0) - - s3_client.upload_fileobj( - thumb_io, - Config.MINIO["bucket"], - thumb_filename.replace('.png', '.jpg'), - ExtraArgs={"ContentType": "image/jpeg"} - ) - thumb_url = f"{Config.MINIO['public_url']}{quote(thumb_filename.replace('.png', '.jpg'))}" - except Exception as thumb_e: - print(f"⚠️ 缩略图生成失败: {thumb_e}") - - processed_data.append({"url": full_url, "thumb": thumb_url}) - success = True - break - except Exception as e: - print(f"⚠️ 第 {attempt+1} 次同步失败: {e}") - time.sleep(2 ** attempt) # 指数退避 - - if not success: - # 如果最终失败,保留原始 URL - processed_data.append({"url": raw_url, "thumb": raw_url}) - - # 更新数据库记录为持久化数据结构 - try: - record = db.session.get(GenerationRecord, record_id) - if record: - record.image_urls = json.dumps(processed_data) - db.session.commit() - print(f"✅ 记录 {record_id} 图片及缩略图已完成同步") - except Exception as e: - print(f"❌ 更新记录失败: {e}") - -def process_image_generation(app, user_id, task_id, payload, api_key, target_api, cost): - """异步执行图片生成并存入 Redis""" - with app.app_context(): - try: - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} - # 使用较长的超时时间 (10分钟),确保长耗时任务不被中断 - resp = requests.post(target_api, json=payload, headers=headers, timeout=1000) - - if resp.status_code != 200: - user = db.session.get(User, user_id) - if user and "sk-" in api_key: - user.points += cost - db.session.commit() - - system_logger.error(f"生图任务失败: {resp.text}", user_id=user_id, task_id=task_id) - redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "error", "message": resp.text})) - return - - api_result = resp.json() - raw_urls = [item['url'] for item in api_result.get('data', [])] - - # 持久化记录 - new_record = GenerationRecord( - user_id=user_id, - prompt=payload.get('prompt'), - model=payload.get('model'), - image_urls=json.dumps(raw_urls) - ) - db.session.add(new_record) - db.session.commit() - - # 后台线程处理:下载 AI 原始图片并同步到私有 MinIO - threading.Thread( - target=sync_images_background, - args=(app, new_record.id, raw_urls) - ).start() - - # 存入 Redis 标记完成 - system_logger.info(f"生图任务完成", user_id=user_id, task_id=task_id, model=payload.get('model')) - redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "complete", "urls": raw_urls})) - - except Exception as e: - # 异常处理:退还积分 - user = db.session.get(User, user_id) - if user and "sk-" in api_key: - user.points += cost - db.session.commit() - - system_logger.error(f"生图任务异常: {str(e)}", user_id=user_id, task_id=task_id) - redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "error", "message": str(e)})) - -def sync_video_background(app, record_id, raw_url, internal_task_id=None): - """后台同步视频至 MinIO,带重试机制""" - with app.app_context(): - success = False - final_url = raw_url - for attempt in range(3): - try: - # 增加了流式下载,处理大视频文件 - with requests.get(raw_url, stream=True, timeout=120) as r: - r.raise_for_status() - content_type = r.headers.get('content-type', 'video/mp4') - ext = ".mp4" - if "text/html" in content_type: # 有些 API 返回的是跳转页面 - continue - - base_filename = f"video-{uuid.uuid4().hex}" - full_filename = f"{base_filename}{ext}" - - video_io = io.BytesIO() - for chunk in r.iter_content(chunk_size=8192): - video_io.write(chunk) - video_io.seek(0) - - # 上传至 MinIO - s3_client.upload_fileobj( - video_io, - Config.MINIO["bucket"], - full_filename, - ExtraArgs={"ContentType": content_type} - ) - - final_url = f"{Config.MINIO['public_url']}{quote(full_filename)}" - success = True - break - except Exception as e: - system_logger.error(f"同步视频失败 (第{attempt+1}次): {str(e)}") - time.sleep(5) - - if success: - try: - record = db.session.get(GenerationRecord, record_id) - if record: - # 更新记录为 MinIO 的 URL - record.image_urls = json.dumps([{"url": final_url, "type": "video"}]) - db.session.commit() - - # 关键修复:同步更新 Redis 中的缓存,这样前端轮询也能拿到最新的 MinIO 地址 - if internal_task_id: - cached_data = redis_client.get(f"task:{internal_task_id}") - if cached_data: - if isinstance(cached_data, bytes): - cached_data = cached_data.decode('utf-8') - task_info = json.loads(cached_data) - task_info['video_url'] = final_url - redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps(task_info)) - - system_logger.info(f"视频同步 MinIO 成功", video_url=final_url) - except Exception as dbe: - system_logger.error(f"更新视频记录失败: {str(dbe)}") - -def process_video_generation(app, user_id, internal_task_id, payload, api_key, cost): - """异步提交并查询视频任务状态""" - with app.app_context(): - try: - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} - # 1. 提交任务 - submit_resp = requests.post(Config.VIDEO_GEN_API, json=payload, headers=headers, timeout=60) - if submit_resp.status_code != 200: - raise Exception(f"视频任务提交失败: {submit_resp.text}") - - submit_result = submit_resp.json() - remote_task_id = submit_result.get('task_id') - if not remote_task_id: - raise Exception(f"未获取到远程任务 ID: {submit_result}") - - # 2. 轮询状态 - max_retries = 90 # 提升到 15 分钟 - video_url = None - for i in range(max_retries): - time.sleep(10) - poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id) - poll_resp = requests.get(poll_url, headers=headers, timeout=30) - if poll_resp.status_code != 200: - continue - - poll_result = poll_resp.json() - status = poll_result.get('status', '').upper() - - if status == 'SUCCESS': - # 提取视频输出地址 - if 'data' in poll_result and isinstance(poll_result['data'], dict): - video_url = poll_result['data'].get('output') - if not video_url: - if 'data' in poll_result and isinstance(poll_result['data'], list) and poll_result['data']: - video_url = poll_result['data'][0].get('url') - elif 'video' in poll_result: - video_url = poll_result['video'].get('url') if isinstance(poll_result['video'], dict) else poll_result['video'] - elif 'url' in poll_result: - video_url = poll_result['url'] - break - elif status in ['FAILED', 'ERROR']: - raise Exception(f"视频生成失败: {poll_result.get('fail_reason') or poll_result.get('message') or '未知错误'}") - - if not video_url: - raise Exception("超时未获取到视频地址") - - # 3. 持久化记录 - new_record = GenerationRecord( - user_id=user_id, - prompt=payload.get('prompt'), - model=payload.get('model'), - image_urls=json.dumps([{"url": video_url, "type": "video"}]) - ) - db.session.add(new_record) - db.session.commit() - - # 后台线程异步同步到 MinIO - threading.Thread( - target=sync_video_background, - args=(app, new_record.id, video_url, internal_task_id) - ).start() - - # 4. 存入 Redis - redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({"status": "complete", "video_url": video_url, "record_id": new_record.id})) - system_logger.info(f"视频生成任务完成", user_id=user_id, task_id=internal_task_id) - - except Exception as e: - system_logger.error(f"视频生成执行异常: {str(e)}", user_id=user_id, task_id=internal_task_id) - # 尝试退费 - try: - user = db.session.get(User, user_id) - if user: - user.points += cost - db.session.commit() - except Exception as re: - system_logger.error(f"退费失败: {str(re)}") - - # 确保 Redis 状态一定被更新,防止前端死循环 - redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({"status": "error", "message": str(e)})) - @api_bp.route('/api/task_status/') @login_required def get_task_status(task_id): @@ -293,7 +25,6 @@ def get_task_status(task_id): if not data: return jsonify({"status": "pending"}) - # 兼容处理 bytes 和 str if isinstance(data, bytes): data = data.decode('utf-8') @@ -306,35 +37,7 @@ def get_task_status(task_id): def get_config(): """从本地数据库字典获取配置""" try: - dicts = SystemDict.query.filter_by(is_active=True).order_by(SystemDict.sort_order.desc()).all() - - config = { - "models": [], - "ratios": [], - "prompts": [], - "sizes": [], - "video_models": [], - "video_prompts": [] - } - - for d in dicts: - item = {"label": d.label, "value": d.value} - if d.dict_type == 'ai_model': - item["cost"] = d.cost - config["models"].append(item) - elif d.dict_type == 'aspect_ratio': - config["ratios"].append(item) - elif d.dict_type == 'prompt_tpl': - config["prompts"].append(item) - elif d.dict_type == 'ai_image_size': - config["sizes"].append(item) - elif d.dict_type == 'video_model': - item["cost"] = d.cost - config["video_models"].append(item) - elif d.dict_type == 'video_prompt': - config["video_prompts"].append(item) - - return jsonify(config) + return jsonify(get_system_config_data()) except Exception as e: return jsonify({"error": str(e)}), 500 @@ -343,16 +46,7 @@ def get_config(): def upload(): try: files = request.files.getlist('images') - img_urls = [] - for f in files: - ext = os.path.splitext(f.filename)[1] - filename = f"{uuid.uuid4().hex}{ext}" - s3_client.upload_fileobj( - f, Config.MINIO["bucket"], filename, - ExtraArgs={"ContentType": f.content_type} - ) - img_urls.append(f"{Config.MINIO['public_url']}{quote(filename)}") - + img_urls = handle_file_uploads(files) system_logger.info(f"用户上传文件: {len(files)} 个", user_id=session.get('user_id')) return jsonify({"urls": img_urls}) except Exception as e: @@ -366,107 +60,42 @@ def generate(): user = db.session.get(User, user_id) data = request.json if request.is_json else request.form - mode = data.get('mode', 'trial') - is_premium = data.get('is_premium', False) - input_key = data.get('apiKey') - target_api = Config.AI_API - api_key = None - use_trial = False + # 1. 验证请求与权限 + api_key, target_api, cost, use_trial, error = validate_generation_request(user, data) + if error: + return jsonify({"error": error}), 400 - if mode == 'key': - api_key = input_key or user.api_key - if not api_key: - return jsonify({"error": "请先输入您的 API 密钥"}), 400 - else: - if user.points > 0: - api_key = Config.PREMIUM_KEY if is_premium else Config.TRIAL_KEY - target_api = Config.TRIAL_API - use_trial = True - else: - return jsonify({"error": "可用积分已耗尽,请充值或切换至自定义 Key 模式"}), 400 - - if mode == 'key' and input_key and input_key != user.api_key: - user.api_key = input_key - db.session.commit() + # 2. 扣除积分 (如果是试用模式) + if use_trial: + deduct_points(user, cost) model_value = data.get('model') - is_chat_model = "gemini" in model_value.lower() or "gpt" in model_value.lower() - - model_dict = SystemDict.query.filter_by(dict_type='ai_model', value=model_value).first() - cost = model_dict.cost if model_dict else 1 - - if use_trial and is_premium: - cost *= 2 - - if use_trial: - if user.points < cost: - return jsonify({"error": f"可用积分不足"}), 400 - user.points -= cost - user.has_used_points = True # 标记已使用过积分 - db.session.commit() - prompt = data.get('prompt') - ratio = data.get('ratio') - size = data.get('size') - image_data = data.get('image_data', []) - + is_chat_model = "gemini" in model_value.lower() or "gpt" in model_value.lower() + + # 3. 处理聊天模型 (同步) + if is_chat_model: + result, status_code = handle_chat_generation_sync(user_id, api_key, model_value, prompt, use_trial, cost) + return jsonify(result), status_code + + # 4. 构造生图 Payload payload = { "prompt": prompt, "model": model_value, "response_format": "url", - "aspect_ratio": ratio + "aspect_ratio": data.get('ratio') } + image_data = data.get('image_data', []) if image_data: payload["image"] = [img.split(',', 1)[1] if ',' in img else img for img in image_data] - if model_value == "nano-banana-2" and size: - payload["image_size"] = size + if model_value == "nano-banana-2" and data.get('size'): + payload["image_size"] = data.get('size') - # 如果是聊天模型,直接同步处理 - if is_chat_model: - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} - chat_payload = { - "model": model_value, - "messages": [{"role": "user", "content": prompt}] - } - resp = requests.post(Config.CHAT_API, json=chat_payload, headers=headers, timeout=120) - if resp.status_code != 200: - if use_trial: - user.points += cost - db.session.commit() - return jsonify({"error": resp.text}), resp.status_code - - api_result = resp.json() - content = api_result['choices'][0]['message']['content'] - - # 记录聊天历史 - if prompt != "解读验光单": - new_record = GenerationRecord( - user_id=user_id, - prompt=prompt, - model=model_value, - image_urls=json.dumps([{"type": "text", "content": content}]) - ) - db.session.add(new_record) - db.session.commit() - - return jsonify({ - "data": [{"content": content, "type": "text"}], - "message": "生成成功!" - }) - - # --- 异步处理生图任务 --- - task_id = str(uuid.uuid4()) + # 5. 启动异步生图任务 app = current_app._get_current_object() - - log_msg = "用户发起验光单解读" if prompt == "解读验光单" else "用户发起生图任务" - system_logger.info(log_msg, model=model_value, mode=mode) - - threading.Thread( - target=process_image_generation, - args=(app, user_id, task_id, payload, api_key, target_api, cost) - ).start() + task_id = start_async_image_task(app, user_id, payload, api_key, target_api, cost, data.get('mode'), model_value) return jsonify({ "task_id": task_id, @@ -483,25 +112,16 @@ def video_generate(): user = db.session.get(User, user_id) data = request.json - # 视频生成统一使用积分模式,隐藏 Key 模式 - if user.points <= 0: - return jsonify({"error": "可用积分不足,请先充值"}), 400 - model_value = data.get('model', 'veo3.1') - - # 确定积分消耗 (优先从字典获取) - model_dict = SystemDict.query.filter_by(dict_type='video_model', value=model_value).first() - cost = model_dict.cost if model_dict else (15 if "pro" in model_value.lower() or "3.1" in model_value else 10) - - if user.points < cost: - return jsonify({"error": f"积分不足,生成该视频需要 {cost} 积分"}), 400 + # 1. 验证请求 + model_value, cost, error = validate_video_request(user, data) + if error: + return jsonify({"error": error}), 400 - # 扣除积分 - user.points -= cost - user.has_used_points = True - db.session.commit() + # 2. 扣除积分 + deduct_points(user, cost) - # 构建符合 API 文档的 Payload + # 3. 构造 Payload payload = { "model": model_value, "prompt": data.get('prompt'), @@ -510,18 +130,9 @@ def video_generate(): "aspect_ratio": data.get('aspect_ratio', '9:16') } - # 使用系统内置的 Key - api_key = Config.TRIAL_KEY # 默认使用试用/中转 Key - - task_id = str(uuid.uuid4()) + # 4. 启动异步视频任务 app = current_app._get_current_object() - - system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost) - - threading.Thread( - target=process_video_generation, - args=(app, user_id, task_id, payload, api_key, cost) - ).start() + task_id = start_async_video_task(app, user_id, payload, cost, model_value) return jsonify({ "task_id": task_id, @@ -533,27 +144,16 @@ def video_generate(): @api_bp.route('/api/notifications/latest', methods=['GET']) @login_required def get_latest_notification(): - """获取用户最近一条未读的激活通知""" try: user_id = session.get('user_id') - latest = SystemNotification.query.filter_by(is_active=True)\ - .filter(~SystemNotification.read_by_users.any(id=user_id))\ - .order_by(SystemNotification.created_at.desc()).first() - - if latest: - return jsonify({ - "id": latest.id, - "title": latest.title, - "content": latest.content - }) - return jsonify({"id": None}) + data = get_user_latest_notification(user_id) + return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 @api_bp.route('/api/notifications/read', methods=['POST']) @login_required def mark_notif_read(): - """将通知标记为已读""" try: user_id = session.get('user_id') data = request.json @@ -561,13 +161,7 @@ def mark_notif_read(): if not notif_id: return jsonify({"error": "缺少通知 ID"}), 400 - notif = db.session.get(SystemNotification, notif_id) - user = db.session.get(User, user_id) - - if notif and user: - if user not in notif.read_by_users: - notif.read_by_users.append(user) - db.session.commit() + mark_notification_as_read(user_id, notif_id) return jsonify({"status": "ok"}) except Exception as e: return jsonify({"error": str(e)}), 500 @@ -575,75 +169,49 @@ def mark_notif_read(): @api_bp.route('/api/history', methods=['GET']) @login_required def get_history(): - """获取用户的历史生成记录 (支持分页,限 90 天内)""" try: - from datetime import datetime, timedelta user_id = session.get('user_id') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) + filter_type = request.args.get('filter_type', 'all') - # 计算 90 天前的时间 - ninety_days_ago = datetime.utcnow() - timedelta(days=90) - - pagination = GenerationRecord.query.filter( - GenerationRecord.user_id == user_id, - GenerationRecord.created_at >= ninety_days_ago, - GenerationRecord.prompt != "解读验光单" # 过滤掉验光单助手的操作记录 - ).order_by(GenerationRecord.created_at.desc())\ - .paginate(page=page, per_page=per_page, error_out=False) - - # 格式化 URL,兼容新旧数据格式 - history_list = [] - for r in pagination.items: - raw_urls = json.loads(r.image_urls) - formatted_urls = [] - for u in raw_urls: - if isinstance(u, str): - # 旧数据:直接返回原图作为缩略图 - formatted_urls.append({"url": u, "thumb": u}) - else: - # 如果是视频类型,提供默认预览图 (此处使用一个公共视频占位图或空) - if u.get('type') == 'video' and not u.get('thumb'): - u['thumb'] = "https://img.icons8.com/flat-round/64/000000/play--v1.png" - formatted_urls.append(u) - - history_list.append({ - "id": r.id, - "prompt": r.prompt, - "model": r.model, - "urls": formatted_urls, - "created_at": (r.created_at + timedelta(hours=8)).strftime('%b %d, %H:%M') - }) - - return jsonify({ - "history": history_list, - "has_next": pagination.has_next, - "total": pagination.total - }) + data = get_user_history_data(user_id, page, per_page, filter_type) + return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 +@api_bp.route('/api/stats/points', methods=['GET']) +@login_required +def get_user_point_stats(): + """获取积分统计图表数据""" + from services.stats_service import get_point_stats + user_id = session.get('user_id') + days = request.args.get('days', 7, type=int) + return jsonify(get_point_stats(user_id, days)) + +@api_bp.route('/api/stats/details', methods=['GET']) +@login_required +def get_user_point_details(): + """获取积分消耗明细""" + from services.stats_service import get_point_details + user_id = session.get('user_id') + page = request.args.get('page', 1, type=int) + return jsonify(get_point_details(user_id, page)) + @api_bp.route('/api/download_proxy', methods=['GET']) @login_required def download_proxy(): - """代理下载远程文件,强制浏览器弹出下载""" + import time url = request.args.get('url') - filename = request.args.get('filename', f"video-{int(time.time())}.mp4") + # 默认文件名逻辑 + default_name = f"video-{int(time.time())}.mp4" + filename = request.args.get('filename') or default_name if not url: return jsonify({"error": "缺少 URL 参数"}), 400 try: - # 流式获取远程文件 - req = requests.get(url, stream=True, timeout=60) - req.raise_for_status() - - headers = {} - if req.headers.get('Content-Type'): - headers['Content-Type'] = req.headers['Content-Type'] - else: - headers['Content-Type'] = 'application/octet-stream' - + req, headers = get_remote_file_stream(url) headers['Content-Disposition'] = f'attachment; filename="{filename}"' def generate(): diff --git a/blueprints/auth.py b/blueprints/auth.py index 74d68c6..fdbe117 100644 --- a/blueprints/auth.py +++ b/blueprints/auth.py @@ -1,5 +1,6 @@ from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for import json +from datetime import datetime, timedelta from extensions import db from models import User from services.sms_service import SMSService @@ -56,14 +57,27 @@ def buy_page(): user_id = session['user_id'] user = db.session.get(User, user_id) - # 获取用户个人充值记录 - personal_orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).limit(10).all() + thirty_min_ago = datetime.utcnow() - timedelta(minutes=30) + + # 获取用户个人充值记录 (过滤掉超过 30 分钟未支付的订单) + personal_orders = Order.query.filter( + Order.user_id == user_id, + db.or_( + Order.status == 'PAID', + db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) + ) + ).order_by(Order.created_at.desc()).limit(10).all() # 如果是管理员,获取全员记录 is_admin = user.has_permission('manage_system') admin_orders = [] if is_admin: - admin_orders = Order.query.order_by(Order.created_at.desc()).limit(10).all() + admin_orders = Order.query.filter( + db.or_( + Order.status == 'PAID', + db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) + ) + ).order_by(Order.created_at.desc()).limit(10).all() # 处理支付成功提示 success = request.args.get('success') == 'true' @@ -72,12 +86,12 @@ def buy_page(): if out_trade_no: order = Order.query.filter_by(out_trade_no=out_trade_no).first() - import datetime + import datetime as dt_module return render_template('buy.html', personal_orders=personal_orders, admin_orders=admin_orders, is_admin=is_admin, - modules={'datetime': datetime}, + modules={'datetime': dt_module}, success=success, order=order) @@ -106,6 +120,11 @@ def send_code(): if not phone: return jsonify({"error": "请输入手机号"}), 400 + + import re + if not re.match(r'^1[3-9]\d{9}$', phone): + return jsonify({"error": "手机号格式不正确"}), 400 + if not captcha: return jsonify({"error": "请输入图形验证码", "show_captcha": True}), 403 @@ -160,6 +179,10 @@ def register(): code = data.get('code') password = data.get('password') + import re + if not phone or not re.match(r'^1[3-9]\d{9}$', phone): + return jsonify({"error": "手机号格式不正确"}), 400 + system_logger.info(f"用户注册请求", phone=phone) if not SMSService.verify_code(phone, code): @@ -217,6 +240,10 @@ def login(): user = User.query.filter_by(phone=phone).first() if user and user.check_password(password): + if user.is_banned: + system_logger.warning(f"被封禁用户尝试登录", phone=phone) + return jsonify({"error": "您的账号已被封禁,请联系管理员"}), 403 + # 登录成功,清除失败计数 redis_client.delete(fail_key) @@ -239,6 +266,10 @@ def reset_password(): code = data.get('code') new_password = data.get('password') + import re + if not phone or not re.match(r'^1[3-9]\d{9}$', phone): + return jsonify({"error": "手机号格式不正确"}), 400 + if not phone or not code or not new_password: return jsonify({"error": "请填写完整信息"}), 400 diff --git a/blueprints/payment.py b/blueprints/payment.py index 606aff2..3e8018d 100644 --- a/blueprints/payment.py +++ b/blueprints/payment.py @@ -1,10 +1,10 @@ from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template from extensions import db -from models import Order, User +from models import Order, User, to_bj_time from services.alipay_service import AlipayService from services.logger import system_logger import uuid -from datetime import datetime +from datetime import datetime, timedelta payment_bp = Blueprint('payment', __name__, url_prefix='/payment') @@ -91,11 +91,19 @@ def payment_history(): if 'user_id' not in session: return redirect(url_for('auth.login')) - user_id = session['user_id'] - orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).all() + thirty_min_ago = datetime.utcnow() - timedelta(minutes=30) - import datetime - return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime}) + user_id = session['user_id'] + orders = Order.query.filter( + Order.user_id == user_id, + db.or_( + Order.status == 'PAID', + db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) + ) + ).order_by(Order.created_at.desc()).all() + + import datetime as dt_module + return render_template('recharge_history.html', orders=orders, modules={'datetime': dt_module}) @payment_bp.route('/api/history', methods=['GET']) def api_payment_history(): @@ -103,10 +111,17 @@ def api_payment_history(): if 'user_id' not in session: return jsonify({'code': 401, 'msg': '请先登录'}), 401 - user_id = session['user_id'] - orders = Order.query.filter_by(user_id=user_id).order_by(Order.created_at.desc()).all() + thirty_min_ago = datetime.utcnow() - timedelta(minutes=30) + + user_id = session['user_id'] + orders = Order.query.filter( + Order.user_id == user_id, + db.or_( + Order.status == 'PAID', + db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) + ) + ).order_by(Order.created_at.desc()).all() - from datetime import timedelta return jsonify({ "orders": [{ "id": o.id, @@ -115,8 +130,8 @@ def api_payment_history(): "points": o.points, "status": o.status, "trade_no": o.trade_no, - "created_at": (o.created_at + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'), - "paid_at": (o.paid_at + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None + "created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), + "paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None } for o in orders] }) @payment_bp.route('/notify', methods=['POST']) @@ -140,7 +155,7 @@ def payment_notify(): if order and order.status == 'PENDING': order.status = 'PAID' order.trade_no = trade_no - order.paid_at = datetime.utcnow() + order.paid_at = datetime.now() user = db.session.get(User, order.user_id) if user: diff --git a/check_alipay_config.py b/check_alipay_config.py deleted file mode 100644 index 0b4a778..0000000 --- a/check_alipay_config.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -支付宝配置检查脚本 -用于检查支付宝配置是否正确,不依赖虚拟环境 -""" - -import sys -import os -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -def check_alipay_config(): - """检查支付宝配置""" - print("="*60) - print("支付宝配置检查") - print("="*60) - - # 读取配置文件内容 - config_path = os.path.join(os.path.dirname(__file__), 'config.py') - - if not os.path.exists(config_path): - print("错误: config.py 文件不存在") - return False - - with open(config_path, 'r', encoding='utf-8') as f: - config_content = f.read() - - # 检查支付宝相关配置 - required_configs = [ - 'ALIPAY_APP_ID', - 'ALIPAY_APP_PRIVATE_KEY', - 'ALIPAY_PUBLIC_KEY', - 'ALIPAY_RETURN_URL', - 'ALIPAY_NOTIFY_URL', - 'ALIPAY_DEBUG' - ] - - print("检查必需的配置项:") - for config in required_configs: - if config in config_content: - print(f" ✓ {config}: 存在") - else: - print(f" ✗ {config}: 缺失") - - # 提取并显示配置值(部分显示以保护安全) - import re - - # 检查密钥格式 - private_key_match = re.search(r'ALIPAY_APP_PRIVATE_KEY\s*=\s*"""(.*?)"""', config_content, re.DOTALL) - public_key_match = re.search(r'ALIPAY_PUBLIC_KEY\s*=\s*"""(.*?)"""', config_content, re.DOTALL) - - if private_key_match: - private_key = private_key_match.group(1) - if "BEGIN RSA PRIVATE KEY" in private_key and "END RSA PRIVATE KEY" in private_key: - print(" ✓ 应用私钥格式正确") - else: - print(" ✗ 应用私钥格式错误") - else: - print(" ✗ 未找到应用私钥") - - if public_key_match: - public_key = public_key_match.group(1) - if "BEGIN PUBLIC KEY" in public_key and "END PUBLIC KEY" in public_key: - print(" ✓ 支付宝公钥格式正确") - else: - print(" ✗ 支付宝公钥格式错误") - else: - print(" ✗ 未找到支付宝公钥") - - # 检查调试模式 - debug_match = re.search(r'ALIPAY_DEBUG\s*=\s*(True|False)', config_content) - if debug_match: - debug_mode = debug_match.group(1) - print(f" ✓ 调试模式: {debug_mode}") - if debug_mode == "False": - print(" 注意: 当前为正式环境模式,确保使用正式环境的密钥") - else: - print(" 提示: 当前为沙箱环境模式,适用于测试") - - print("\n" + "="*60) - print("常见问题排查") - print("="*60) - print(""" -1. 密钥配置问题: - - 确保应用私钥和支付宝公钥格式正确 - - 检查BEGIN/END标签是否完整 - - 确认沙箱/正式环境配置一致 - -2. 回调地址问题: - - 确保ALIPAY_RETURN_URL和ALIPAY_NOTIFY_URL可以公网访问 - - 检查URL是否拼写正确 - -3. 签名算法: - - SDK默认使用RSA2算法 - - 确保支付宝开放平台应用设置中也是RSA2 - -4. 对于同步回调400错误: - - 主要是签名验证失败 - - 需要正确处理sign和sign_type参数 - - 确保使用正确的公钥验证 - -5. 环境问题: - - 如果使用沙箱环境,确保使用沙箱的AppID和密钥 - - 确保服务器时间准确 - """) - - return True - -def analyze_callback_request(): - """分析提供的回调请求""" - print("\n" + "="*60) - print("回调请求分析") - print("="*60) - - sample_request = "GET /payment/return?charset=utf-8&out_trade_no=20260114192935cfcf96&method=alipay.trade.page.pay.return&total_amount=5.00&sign=ksH8Nov1SA9U4fgovUXv%2BXxmZccCDaqVhPmm%2BAPlGL8QgMYWDN7NSqDQTDoVshe2agHT11rNuVEXuApE3lVOnBPPbUvUlyMdaWpx/0GlFBRS0tezfdUcCQsShOTj4YdKwa2K0bfoqQeStupG0LFVipsWiga9WIryFU5JWDK3lDOuaVLiw2gLFMemsz/Xg14UPQMWcmlyXVGYzeLYvNmVRbQQjnJL8m%2BFOq5tqMgopEtZmAC4wstIwm7n1kOrV%2Bs/HBxMeQqWOTtFEbDkzbU8o%2BhS5%2BavQm5BUvFTmjbsVs6Npo5qmmTkI8dRvqRO1HzqSv6ymL8%2BpiguKEBmaFaBeg%3D%3D&trade_no=2026011422001420011429097835&auth_app_id=2021006126656681&version=1.0&app_id=2021006126656681&sign_type=RSA2&seller_id=2088802857784696×tamp=2026-01-14+19:30:21" - - print("从您提供的请求中,我们可以看到:") - print("- 订单号: 20260114192935cfcf96") - print("- 支付金额: 5.00") - print("- 支付宝交易号: 2026011422001420011429097835") - print("- 签名类型: RSA2") - print("- 时间戳: 2026-01-14 19:30:21") - print("\n这些参数应该被正确传递给验证函数") - - print("\n" + "="*60) - print("解决方案") - print("="*60) - print(""" -1. 确保签名验证时: - - 从参数中移除 'sign' 和 'sign_type' 字段 - - 使用剩余参数进行签名验证 - - 使用正确的支付宝公钥 - -2. 检查配置: - - 确认使用的AppID与请求中的app_id一致 - - 确认密钥对正确配对 - -3. 日志查看: - - 运行应用后,进行一次支付测试 - - 查看 logs/system.log 中的详细错误信息 - - 检查具体的验证失败原因 - -4. 沙箱测试: - - 如果使用沙箱环境,请确保配置正确 - - 使用沙箱提供的测试账号进行支付 - """) - -if __name__ == "__main__": - print("支付宝配置检查工具\n") - - check_alipay_config() - analyze_callback_request() - - print("\n" + "="*60) - print("下一步操作建议") - print("="*60) - print(""" -1. 运行应用: python app.py -2. 进行支付测试 -3. 检查日志文件 logs/system.log -4. 查看具体的错误信息 -5. 根据错误信息进行相应修复 - """) \ No newline at end of file diff --git a/config.py b/config.py index 46f1c42..9f95722 100644 --- a/config.py +++ b/config.py @@ -7,6 +7,11 @@ class Config: # PostgreSQL 配置 SQLALCHEMY_DATABASE_URI = "postgresql://user_xREpkJ:password_DZz8DQ@331002.xyz:2022/ai_vision" SQLALCHEMY_TRACK_MODIFICATIONS = False + SQLALCHEMY_ENGINE_OPTIONS = { + "pool_pre_ping": True, + "pool_recycle": 1800, + "pool_timeout": 30 + } # Redis 配置 REDIS_URL = "redis://:redis_WWjNyb@331002.xyz:2020/0" diff --git a/create_database.py b/create_database.py deleted file mode 100644 index 1b3aab5..0000000 --- a/create_database.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -数据库创建脚本 -用于在 PostgreSQL 服务器上创建 ai_vision 数据库 -""" - -from sqlalchemy import create_engine, text -from sqlalchemy.engine import url - -# 数据库连接信息 (从 config 或直接指定) -DB_HOST = "331002.xyz" -DB_PORT = 2022 -DB_USER = "user_xREpkJ" -DB_PASSWORD = "password_DZz8DQ" -DB_NAME = "ai_vision" - -def create_database(): - """创建数据库""" - try: - # 连接到默认的 postgres 数据库 - print(f"🔗 正在连接到 PostgreSQL 服务器 {DB_HOST}:{DB_PORT}...") - - # 构造连接 URL (连接到 postgres 数据库以执行 CREATE DATABASE) - postgres_url = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/postgres" - engine = create_engine(postgres_url, isolation_level="AUTOCOMMIT") - - with engine.connect() as conn: - # 检查数据库是否存在 - result = conn.execute(text(f"SELECT 1 FROM pg_database WHERE datname = '{DB_NAME}'")) - exists = result.fetchone() - - if exists: - print(f"✅ 数据库 {DB_NAME} 已经存在") - else: - # 创建数据库 - print(f"🔧 正在创建数据库 {DB_NAME}...") - conn.execute(text(f'CREATE DATABASE {DB_NAME}')) - print(f"✅ 数据库 {DB_NAME} 创建成功!") - - print(f"\n📊 数据库信息:") - print(f" 主机: {DB_HOST}:{DB_PORT}") - print(f" 数据库名: {DB_NAME}") - print(f" 用户: {DB_USER}") - print(f"\n💡 下一步:运行 python init_db.py 创建数据表") - - except Exception as e: - print(f"❌ 发生错误: {e}") - -if __name__ == '__main__': - create_database() diff --git a/db_manager.py b/db_manager.py new file mode 100644 index 0000000..38c97c7 --- /dev/null +++ b/db_manager.py @@ -0,0 +1,67 @@ +import sys +import os +from app import create_app, db +from flask_migrate import upgrade, migrate, init, stamp + +def init_migrations(): + """初始化迁移环境 (如果 migrations 文件夹不存在)""" + if not os.path.exists('migrations'): + print("📂 初始化 migrations 文件夹...") + with app.app_context(): + init() + else: + print("✅ migrations 文件夹已存在,跳过初始化。") + +def make_migrations(message="Auto update"): + """生成迁移脚本 (检测 models.py 的变动)""" + print(f"📝 正在生成迁移脚本 (备注: {message})...") + with app.app_context(): + try: + migrate(message=message) + print("✅ 迁移脚本生成成功!") + except Exception as e: + print(f"⚠️ 生成失败 (可能是没有变动): {e}") + +def apply_migrations(): + """应用迁移到数据库 (执行 SQL)""" + print("🚀 正在将变动应用到数据库...") + with app.app_context(): + upgrade() + print("✅ 数据库结构已同步至最新版!") + +def force_sync(): + """强制同步 (不仅是 Upgrade,还包括 create_all)""" + print("🔧 正在进行全量同步检查...") + with app.app_context(): + db.create_all() + # 标记当前状态为最新 + stamp() + print("✅ 数据库表结构已确保存在。") + +if __name__ == '__main__': + app = create_app() + + if len(sys.argv) < 2: + print("\n🔧 数据库管理脚本使用说明:") + print(" python db_manager.py init -> 初始化迁移环境") + print(" python db_manager.py make -> 生成迁移文件 (检测 models.py 变动)") + print(" python db_manager.py up -> 执行更新 (修改数据库表结构)") + print(" python db_manager.py sync -> 一键同步模式 (生成 + 执行)") + sys.exit(0) + + cmd = sys.argv[1] + + if cmd == 'init': + init_migrations() + elif cmd == 'make': + msg = sys.argv[2] if len(sys.argv) > 2 else "schema_update" + make_migrations(msg) + elif cmd == 'up': + apply_migrations() + elif cmd == 'sync': + # 一键傻瓜模式 + init_migrations() + make_migrations("auto_sync") + apply_migrations() + else: + print("❌ 未知命令") diff --git a/fix_db_manual.py b/fix_db_manual.py deleted file mode 100644 index 458ebd8..0000000 --- a/fix_db_manual.py +++ /dev/null @@ -1,22 +0,0 @@ -from sqlalchemy import create_engine, text -from config import Config - -def migrate(): - # 从 URI 解析连接参数 - uri = Config.SQLALCHEMY_DATABASE_URI - print(f"正在手动连接数据库进行迁移 (SQLAlchemy)... ") - - engine = create_engine(uri) - - try: - with engine.connect() as conn: - # 添加 api_key 字段 - print("🔧 正在检查并添加 users.api_key 字段...") - conn.execute(text("ALTER TABLE users ADD COLUMN IF NOT EXISTS api_key VARCHAR(255);")) - conn.commit() - print("✅ 数据库字段 users.api_key 处理成功") - except Exception as e: - print(f"❌ 迁移失败: {e}") - -if __name__ == "__main__": - migrate() diff --git a/fix_db_manual_points.py b/fix_db_manual_points.py deleted file mode 100644 index 34bd0b6..0000000 --- a/fix_db_manual_points.py +++ /dev/null @@ -1,30 +0,0 @@ -from sqlalchemy import create_engine, text -from config import Config - -def fix_db(): - # 从 SQLALCHEMY_DATABASE_URI 提取连接信息 - uri = Config.SQLALCHEMY_DATABASE_URI - print(f"🔗 正在尝试连接数据库 (SQLAlchemy)... ") - - engine = create_engine(uri) - - try: - with engine.connect() as conn: - # 检查并添加 points 字段 - print("🔧 正在检查并添加 users.points 字段...") - conn.execute(text(""" - DO $$ - BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='users' AND column_name='points') THEN - ALTER TABLE users ADD COLUMN points INTEGER DEFAULT 2; - END IF; - END $$; - """)) - conn.commit() - print("✅ 数据库字段 points 处理完成 (默认值 2)") - - except Exception as e: - print(f"❌ 数据库修复失败: {e}") - -if __name__ == "__main__": - fix_db() diff --git a/force_init.py b/force_init.py deleted file mode 100644 index fdfd66f..0000000 --- a/force_init.py +++ /dev/null @@ -1,81 +0,0 @@ -from app import app -from extensions import db -from models import User, Role, Permission -from sqlalchemy import text -import sys - -def force_migrate(): - with app.app_context(): - print("🛠️ 开始强制迁移...") - - try: - # 1. 尝试清除该表的其他活动连接 (仅限 PostgreSQL) - print("🧹 正在清理数据库死锁...") - db.session.execute(text(""" - SELECT pg_terminate_backend(pid) - FROM pg_stat_activity - WHERE datname = current_database() - AND pid <> pg_backend_pid(); - """)) - db.session.commit() - except Exception as e: - print(f"⚠️ 清理连接跳过 (可能是权限问题): {e}") - - try: - # 2. 创建所有新表 - print("📦 正在同步表结构...") - db.create_all() - - # 3. 增加字段 - print("📝 正在调整 users 表结构...") - db.session.execute(text('ALTER TABLE users ADD COLUMN IF NOT EXISTS role_id INTEGER REFERENCES roles(id)')) - db.session.commit() - - # 4. 初始化角色和权限 - print("🚀 正在初始化 RBAC 权限数据...") - perms = { - 'view_logs': '查看系统日志', - 'manage_rbac': '管理角色与权限', - 'manage_users': '管理用户信息', - 'manage_system': '系统最高权限' - } - perm_objs = {} - for code, desc in perms.items(): - p = Permission.query.filter_by(name=code).first() - if not p: - p = Permission(name=code, description=desc) - db.session.add(p) - db.session.flush() - perm_objs[code] = p - - # 创建管理员角色 - admin_role = Role.query.filter_by(name='超级管理员').first() - if not admin_role: - admin_role = Role(name='超级管理员', description='系统最高权限持有者') - admin_role.permissions = list(perm_objs.values()) - db.session.add(admin_role) - - # 创建用户角色 - user_role = Role.query.filter_by(name='普通用户').first() - if not user_role: - user_role = Role(name='普通用户', description='常规功能使用者') - db.session.add(user_role) - - db.session.flush() - - # 5. 修复旧数据:把所有现有用户设为超级管理员(方便你第一时间进入后台) - print("👤 正在升级现有用户为管理员...") - all_users = User.query.all() - for u in all_users: - u.role = admin_role - - db.session.commit() - print("✨ 迁移与初始化全部完成!") - - except Exception as e: - print(f"❌ 运行出错: {e}") - db.session.rollback() - raise e - -if __name__ == '__main__': - force_migrate() \ No newline at end of file diff --git a/init_db.py b/init_db.py deleted file mode 100644 index fb3184e..0000000 --- a/init_db.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -数据库初始化脚本 -用于手动创建或重置数据库表 -""" - -from app import app -from extensions import db -import models - -def init_database(): - """初始化数据库表""" - with app.app_context(): - print("🔧 开始初始化数据库...") - - # 创建所有表 - db.create_all() - - # 检查表是否创建成功 - from sqlalchemy import inspect - inspector = inspect(db.engine) - tables = inspector.get_table_names() - - print(f"\n✅ 数据库表已创建,共 {len(tables)} 张表:") - for table in tables: - print(f" - {table}") - - print("\n📊 表结构详情:") - for table_name in tables: - columns = inspector.get_columns(table_name) - print(f"\n{table_name}:") - for col in columns: - print(f" {col['name']} ({col['type']})") - -def drop_all_tables(): - """删除所有表(慎用)""" - with app.app_context(): - print("⚠️ 警告:即将删除所有数据库表!") - confirm = input("确认删除?输入 yes 继续: ") - if confirm.lower() == 'yes': - db.drop_all() - print("✅ 所有表已删除") - else: - print("❌ 操作已取消") - -if __name__ == '__main__': - import sys - - if len(sys.argv) > 1 and sys.argv[1] == '--drop': - drop_all_tables() - else: - init_database() diff --git a/init_dicts.py b/init_dicts.py deleted file mode 100644 index 6be6143..0000000 --- a/init_dicts.py +++ /dev/null @@ -1,54 +0,0 @@ -import requests -from app import app -from extensions import db -from models import SystemDict -from config import Config - -def fetch_and_init(): - with app.app_context(): - # 定义需要抓取的字典代码及对应的本地类型 - target_mappings = { - "nano_model": "ai_model", - "aspect_ratio": "aspect_ratio", - "ai_prompt": "prompt_tpl", - "ai_image_size": "ai_image_size" - } - - print("🚀 开始从远程接口获取字典数据...") - - for remote_code, local_type in target_mappings.items(): - try: - url = f"{Config.DICT_URL}?platform={Config.PLATFORM}&code={remote_code}" - response = requests.get(url, verify=False, timeout=15) - - if response.status_code == 200: - data = response.json().get("data", []) - print(f"📦 抓取到 {remote_code} ({len(data)} 条数据)") - - for item in data: - label = item.get("label") - value = item.get("value") - - # 检查本地是否已存在 - exists = SystemDict.query.filter_by(dict_type=local_type, value=value).first() - if not exists: - new_dict = SystemDict( - dict_type=local_type, - label=label, - value=value, - cost=1 if local_type == 'ai_model' else 0, # 模型默认 1 积分,其余 0 - is_active=True - ) - db.session.add(new_dict) - else: - print(f"❌ 抓取 {remote_code} 失败: HTTP {response.status_code}") - - except Exception as e: - print(f"⚠️ 抓取 {remote_code} 发生异常: {e}") - - db.session.commit() - print("\n✅ 字典数据本地化初始化成功!") - print("💡 您现在可以直接在数据库 system_dicts 表中修改模型的 cost (积分消耗) 字段。") - -if __name__ == "__main__": - fetch_and_init() diff --git a/init_notifications.py b/init_notifications.py deleted file mode 100644 index edf034b..0000000 --- a/init_notifications.py +++ /dev/null @@ -1,30 +0,0 @@ -from app import app -from extensions import db -from models import SystemNotification - -def init_notifications(): - with app.app_context(): - # 检查是否已存在通知 - if SystemNotification.query.first(): - print("📅 通知系统已初始化,跳过。") - return - - # 创建欢迎通知 - welcome_notif = SystemNotification( - title="✨ 欢迎使用 AI 视界 2.0", - content="""感谢您体验我们的 AI 创作平台! - -1. 我们已上线“优质渲染模式”,支持更精细的画面细节。 -2. 积分充值功能正在最后联调中,敬请期待。 -3. 提示词区域现已支持自动折叠,界面更简洁。 - -如果您有任何建议,欢迎通过系统审计日志联系管理员。""", - is_active=True - ) - - db.session.add(welcome_notif) - db.session.commit() - print("✅ 系统欢迎通知已发布!") - -if __name__ == "__main__": - init_notifications() \ No newline at end of file diff --git a/init_rbac.py b/init_rbac.py deleted file mode 100644 index 0a5f992..0000000 --- a/init_rbac.py +++ /dev/null @@ -1,58 +0,0 @@ -from app import app -from extensions import db -from models import User, Role, Permission - -def init_rbac(): - with app.app_context(): - print("🚀 正在初始化 RBAC 系统...") - - # 1. 创建基础权限 - perms = { - 'view_logs': '查看系统日志', - 'manage_rbac': '管理角色与权限', - 'manage_users': '管理用户信息', - 'manage_dicts': '管理系统字典', - 'manage_notifications': '管理系统通知', - 'manage_system': '系统最高权限' - } - perm_objs = {} - for code, desc in perms.items(): - p = Permission.query.filter_by(name=code).first() - if not p: - p = Permission(name=code, description=desc) - db.session.add(p) - perm_objs[code] = p - - db.session.commit() - - # 2. 创建基础角色 - # 超级管理员角色 - admin_role = Role.query.filter_by(name='超级管理员').first() - if not admin_role: - admin_role = Role(name='超级管理员', description='系统最高权限持有者') - admin_role.permissions = list(perm_objs.values()) - db.session.add(admin_role) - else: - # 确保现有超级管理员拥有所有新权限 - admin_role.permissions = list(perm_objs.values()) - - # 普通用户角色 - user_role = Role.query.filter_by(name='普通用户').first() - if not user_role: - user_role = Role(name='普通用户', description='常规功能使用者') - db.session.add(user_role) - - db.session.commit() - - # 3. 为现有用户分配超级管理员角色(作为测试) - # 请根据实际情况修改 - first_user = User.query.first() - if first_user: - first_user.role = admin_role - db.session.commit() - print(f"✅ 已将用户 {first_user.phone} 设为超级管理员") - - print("✨ RBAC 初始化完成") - -if __name__ == '__main__': - init_rbac() \ No newline at end of file diff --git a/middlewares/auth.py b/middlewares/auth.py index 05a9e24..816f1ab 100644 --- a/middlewares/auth.py +++ b/middlewares/auth.py @@ -14,6 +14,17 @@ def login_required(f): return jsonify({"error": "请先登录", "code": 401}), 401 # 记录当前路径以便登录后跳转 return redirect(url_for('auth.login_page', next=request.path)) + + # 增加封禁检查 + from extensions import db + user = db.session.get(User, user_id) + if user and user.is_banned: + session.pop('user_id', None) # 强制踢下线 + system_logger.error(f"遭封禁用户尝试访问: {request.path}", user_id=user_id) + if request.path.startswith('/api/'): + return jsonify({"error": "您的账号已被封禁", "code": 403}), 403 + return redirect(url_for('auth.login_page', error="您的账号已被封禁")) + return f(*args, **kwargs) return decorated_function @@ -30,7 +41,17 @@ def permission_required(perm_name): return redirect(url_for('auth.login_page', next=request.path)) user = User.query.get(user_id) - if not user or not user.has_permission(perm_name): + if not user or user.is_banned: + if user and user.is_banned: + session.pop('user_id', None) + system_logger.error(f"遭封禁用户通过权限检查尝试: {request.path}", user_id=user_id) + + if request.path.startswith('/api/'): + msg = "您的账号已被封禁" if user and user.is_banned else f"需要权限: {perm_name}" + return jsonify({"error": msg, "code": 403}), 403 + return redirect(url_for('index', error="权限不足或账号已封禁")) + + if not user.has_permission(perm_name): system_logger.warning(f"未授权访问尝试 (权限不足: {perm_name}): {request.path}", user_id=user_id, ip=request.remote_addr) if request.path.startswith('/api/'): return jsonify({"error": f"需要权限: {perm_name}", "code": 403}), 403 diff --git a/migrate_api_key.py b/migrate_api_key.py deleted file mode 100644 index cff6307..0000000 --- a/migrate_api_key.py +++ /dev/null @@ -1,13 +0,0 @@ -from app import app -from extensions import db -from sqlalchemy import text - -with app.app_context(): - try: - # 尝试添加 api_key 字段到 users 表 - db.session.execute(text('ALTER TABLE users ADD COLUMN IF NOT EXISTS api_key VARCHAR(255)')) - db.session.commit() - print("✅ 数据库字段 users.api_key 同步成功") - except Exception as e: - db.session.rollback() - print(f"❌ 同步失败: {e}") diff --git a/migrate_db.py b/migrate_db.py deleted file mode 100644 index 4cd1a5c..0000000 --- a/migrate_db.py +++ /dev/null @@ -1,23 +0,0 @@ -from app import app -from extensions import db -from sqlalchemy import text - -def migrate(): - with app.app_context(): - print("🔧 正在为 users 表增加 role 字段...") - try: - # 使用原生 SQL 增加字段 - db.session.execute(text('ALTER TABLE users ADD COLUMN IF NOT EXISTS role VARCHAR(20) DEFAULT \'user\'')) - db.session.commit() - print("✅ 字段增加成功") - - # 设置管理员(可选,方便您测试) - # db.session.execute(text("UPDATE users SET role = 'admin' WHERE phone = '您的手机号'")) - # db.session.commit() - - except Exception as e: - print(f"❌ 迁移失败: {e}") - db.session.rollback() - -if __name__ == '__main__': - migrate() \ No newline at end of file diff --git a/migrate_rbac.py b/migrate_rbac.py deleted file mode 100644 index e47b6bc..0000000 --- a/migrate_rbac.py +++ /dev/null @@ -1,29 +0,0 @@ -from app import app -from extensions import db -from sqlalchemy import text - -def migrate(): - with app.app_context(): - print("🔧 正在平滑迁移至 RBAC 体系...") - try: - # 1. 创建新表 - db.create_all() - - # 2. 修改 users 表结构 - # 增加 role_id - db.session.execute(text('ALTER TABLE users ADD COLUMN IF NOT EXISTS role_id INTEGER REFERENCES roles(id)')) - - # 3. 尝试迁移旧数据:如果旧的 role 字段存在且值为 'admin',则关联超级管理员角色 - # 我们先执行初始化脚本创建角色 - from init_rbac import init_rbac - init_rbac() - - db.session.commit() - print("✅ 数据库结构迁移成功") - - except Exception as e: - print(f"❌ 迁移失败: {e}") - db.session.rollback() - -if __name__ == '__main__': - migrate() \ No newline at end of file diff --git a/migrations/versions/0cc7ce54ecc0_auto_sync.py b/migrations/versions/0cc7ce54ecc0_auto_sync.py new file mode 100644 index 0000000..b2456ef --- /dev/null +++ b/migrations/versions/0cc7ce54ecc0_auto_sync.py @@ -0,0 +1,32 @@ +"""auto_sync + +Revision ID: 0cc7ce54ecc0 +Revises: a77f97f56b61 +Create Date: 2026-01-17 22:41:07.264297 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0cc7ce54ecc0' +down_revision = 'a77f97f56b61' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('generation_records', schema=None) as batch_op: + batch_op.add_column(sa.Column('cost', sa.Integer(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('generation_records', schema=None) as batch_op: + batch_op.drop_column('cost') + + # ### end Alembic commands ### diff --git a/migrations/versions/a77f97f56b61_auto_sync.py b/migrations/versions/a77f97f56b61_auto_sync.py new file mode 100644 index 0000000..ead3534 --- /dev/null +++ b/migrations/versions/a77f97f56b61_auto_sync.py @@ -0,0 +1,32 @@ +"""auto_sync + +Revision ID: a77f97f56b61 +Revises: 9024b393e1ef +Create Date: 2026-01-17 22:32:23.254493 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a77f97f56b61' +down_revision = '9024b393e1ef' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('users', schema=None) as batch_op: + batch_op.add_column(sa.Column('is_banned', sa.Boolean(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('users', schema=None) as batch_op: + batch_op.drop_column('is_banned') + + # ### end Alembic commands ### diff --git a/models.py b/models.py index c04f1d8..de73123 100644 --- a/models.py +++ b/models.py @@ -1,7 +1,14 @@ from extensions import db -from datetime import datetime +from datetime import datetime, timedelta from werkzeug.security import generate_password_hash, check_password_hash +def to_bj_time(dt): + """将 UTC 时间转换为北京时间 (UTC+8)""" + if not dt: + return None + return dt + timedelta(hours=8) + + # 角色与权限的多对多关联表 role_permissions = db.Table('role_permissions', db.Column('role_id', db.Integer, db.ForeignKey('roles.id'), primary_key=True), @@ -20,7 +27,9 @@ class Role(db.Model): name = db.Column(db.String(50), unique=True, nullable=False) # 如: '超级管理员', '普通用户' description = db.Column(db.String(100)) # 角色拥有的权限 - permissions = db.relationship('Permission', secondary=role_permissions, backref=db.backref('roles', lazy='dynamic')) + permissions = db.relationship('Permission', secondary=role_permissions, + backref=db.backref('roles', lazy='dynamic'), + order_by='Permission.id') class User(db.Model): __tablename__ = 'users' @@ -31,9 +40,14 @@ class User(db.Model): api_key = db.Column(db.String(255)) # 存储用户的 API Key points = db.Column(db.Integer, default=2) # 账户积分,默认赠送2次试用 has_used_points = db.Column(db.Boolean, default=False) # 是否使用过积分 + is_banned = db.Column(db.Boolean, default=False) # 账号是否被封禁 # 关联角色 ID role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) created_at = db.Column(db.DateTime, default=datetime.utcnow) + + @property + def created_at_bj(self): + return to_bj_time(self.created_at) # 关系映射 role = db.relationship('Role', backref=db.backref('users', lazy='dynamic')) @@ -78,9 +92,14 @@ class GenerationRecord(db.Model): user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) prompt = db.Column(db.Text) model = db.Column(db.String(100)) + cost = db.Column(db.Integer, default=0) # 消耗积分 # 存储生成的图片 URL 列表 (JSON 字符串) image_urls = db.Column(db.Text) created_at = db.Column(db.DateTime, default=datetime.utcnow) + + @property + def created_at_bj(self): + return to_bj_time(self.created_at) user = db.relationship('User', backref=db.backref('records', lazy='dynamic', order_by='GenerationRecord.created_at.desc()')) @@ -99,6 +118,10 @@ class SystemNotification(db.Model): content = db.Column(db.Text, nullable=False) is_active = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) + + @property + def created_at_bj(self): + return to_bj_time(self.created_at) # 哪些用户已读 read_by_users = db.relationship('User', secondary=notification_reads, backref=db.backref('read_notifications', lazy='dynamic')) @@ -116,6 +139,14 @@ class Order(db.Model): trade_no = db.Column(db.String(64)) # 支付宝交易号 created_at = db.Column(db.DateTime, default=datetime.utcnow) paid_at = db.Column(db.DateTime) + + @property + def created_at_bj(self): + return to_bj_time(self.created_at) + + @property + def paid_at_bj(self): + return to_bj_time(self.paid_at) user = db.relationship('User', backref=db.backref('orders', lazy='dynamic', order_by='Order.created_at.desc()')) @@ -136,6 +167,10 @@ class SystemLog(db.Model): method = db.Column(db.String(10)) user_agent = db.Column(db.String(255)) - created_at = db.Column(db.DateTime, default=datetime.now) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + @property + def created_at_bj(self): + return to_bj_time(self.created_at) user = db.relationship('User', backref=db.backref('logs', lazy='dynamic', order_by='SystemLog.created_at.desc()')) diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/file_service.py b/services/file_service.py new file mode 100644 index 0000000..4dcc91a --- /dev/null +++ b/services/file_service.py @@ -0,0 +1,34 @@ +import os +import uuid +import time +import requests +from urllib.parse import quote +from werkzeug.utils import secure_filename +from config import Config +from extensions import s3_client + +def handle_file_uploads(files): + """处理文件上传到 MinIO""" + img_urls = [] + for f in files: + ext = os.path.splitext(f.filename)[1] + filename = f"{uuid.uuid4().hex}{ext}" + s3_client.upload_fileobj( + f, Config.MINIO["bucket"], filename, + ExtraArgs={"ContentType": f.content_type} + ) + img_urls.append(f"{Config.MINIO['public_url']}{quote(filename)}") + return img_urls + +def get_remote_file_stream(url): + """获取远程文件的流""" + req = requests.get(url, stream=True, timeout=60) + req.raise_for_status() + + headers = {} + if req.headers.get('Content-Type'): + headers['Content-Type'] = req.headers['Content-Type'] + else: + headers['Content-Type'] = 'application/octet-stream' + + return req, headers diff --git a/services/generation_service.py b/services/generation_service.py new file mode 100644 index 0000000..1276b69 --- /dev/null +++ b/services/generation_service.py @@ -0,0 +1,156 @@ +from config import Config +from models import SystemDict, GenerationRecord, User, db +from services.logger import system_logger +from services.task_service import process_image_generation, process_video_generation +import requests +import json +import uuid +import threading +from flask import current_app + +def get_model_cost(model_value, is_video=False): + """获取模型消耗积分""" + dict_type = 'video_model' if is_video else 'ai_model' + model_dict = SystemDict.query.filter_by(dict_type=dict_type, value=model_value).first() + + if model_dict: + return model_dict.cost + + # Default costs + if is_video: + return 15 if "pro" in model_value.lower() or "3.1" in model_value else 10 + else: + return 1 + +def validate_generation_request(user, data): + """验证生图请求并返回配置 (api_key, target_api, cost, use_trial)""" + mode = data.get('mode', 'trial') + is_premium = data.get('is_premium', False) + input_key = data.get('apiKey') + model_value = data.get('model') + + target_api = Config.AI_API + api_key = None + use_trial = False + + if mode == 'key': + api_key = input_key or user.api_key + if not api_key: + return None, None, 0, False, "请先输入您的 API 密钥" + + # Update user key if changed + if input_key and input_key != user.api_key: + user.api_key = input_key + db.session.commit() + else: + if user.points > 0: + api_key = Config.PREMIUM_KEY if is_premium else Config.TRIAL_KEY + target_api = Config.TRIAL_API + use_trial = True + else: + return None, None, 0, False, "可用积分已耗尽,请充值或切换至自定义 Key 模式" + + cost = get_model_cost(model_value, is_video=False) + if use_trial and is_premium: + cost *= 2 + + if use_trial: + if user.points < cost: + return None, None, cost, True, "可用积分不足" + + return api_key, target_api, cost, use_trial, None + +def deduct_points(user, cost): + """扣除积分""" + user.points -= cost + user.has_used_points = True + db.session.commit() + +def refund_points(user_id, cost): + """退还积分""" + try: + user = db.session.get(User, user_id) + if user: + user.points += cost + db.session.commit() + except: + pass + +def handle_chat_generation_sync(user_id, api_key, model_value, prompt, use_trial, cost): + """同步处理对话类模型""" + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + chat_payload = { + "model": model_value, + "messages": [{"role": "user", "content": prompt}] + } + try: + resp = requests.post(Config.CHAT_API, json=chat_payload, headers=headers, timeout=120) + if resp.status_code != 200: + if use_trial: + refund_points(user_id, cost) + return {"error": resp.text}, resp.status_code + + api_result = resp.json() + content = api_result['choices'][0]['message']['content'] + + # 记录聊天历史 + if prompt != "解读验光单": + new_record = GenerationRecord( + user_id=user_id, + prompt=prompt, + model=model_value, + cost=cost, + image_urls=json.dumps([{"type": "text", "content": content}]) + ) + db.session.add(new_record) + db.session.commit() + + return { + "data": [{"content": content, "type": "text"}], + "message": "生成成功!" + }, 200 + except Exception as e: + if use_trial: + refund_points(user_id, cost) + return {"error": str(e)}, 500 + +def start_async_image_task(app, user_id, payload, api_key, target_api, cost, mode, model_value): + """启动异步生图任务""" + task_id = str(uuid.uuid4()) + + log_msg = "用户发起验光单解读" if payload.get('prompt') == "解读验光单" else "用户发起生图任务" + system_logger.info(log_msg, model=model_value, mode=mode) + + threading.Thread( + target=process_image_generation, + args=(app, user_id, task_id, payload, api_key, target_api, cost) + ).start() + + return task_id + +def validate_video_request(user, data): + """验证视频生成请求""" + if user.points <= 0: + return None, 0, "可用积分不足,请先充值" + + model_value = data.get('model', 'veo3.1') + cost = get_model_cost(model_value, is_video=True) + + if user.points < cost: + return None, cost, f"积分不足,生成该视频需要 {cost} 积分" + + return model_value, cost, None + +def start_async_video_task(app, user_id, payload, cost, model_value): + """启动异步视频任务""" + api_key = Config.TRIAL_KEY + task_id = str(uuid.uuid4()) + + system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost) + + threading.Thread( + target=process_video_generation, + args=(app, user_id, task_id, payload, api_key, cost) + ).start() + + return task_id diff --git a/services/history_service.py b/services/history_service.py new file mode 100644 index 0000000..23b15df --- /dev/null +++ b/services/history_service.py @@ -0,0 +1,55 @@ +from models import GenerationRecord, to_bj_time +from flask import request +import json +from datetime import datetime, timedelta + +def get_user_history_data(user_id, page=1, per_page=10, filter_type='all'): + """获取用户历史记录业务逻辑""" + + # 计算 90 天前的时间 + ninety_days_ago = datetime.now() - timedelta(days=90) + + query = GenerationRecord.query.filter( + GenerationRecord.user_id == user_id, + GenerationRecord.created_at >= ninety_days_ago, + GenerationRecord.prompt != "解读验光单" + ) + + if filter_type == 'video': + # 只看视频 + query = query.filter(GenerationRecord.image_urls.like('%"type": "video"%')) + elif filter_type == 'image': + # 只看图片 (排除视频) + query = query.filter(~GenerationRecord.image_urls.like('%"type": "video"%')) + + pagination = query.order_by(GenerationRecord.created_at.desc())\ + .paginate(page=page, per_page=per_page, error_out=False) + + # 格式化 URL,兼容新旧数据格式 + history_list = [] + for r in pagination.items: + raw_urls = json.loads(r.image_urls) + formatted_urls = [] + for u in raw_urls: + if isinstance(u, str): + # 旧数据:直接返回原图作为缩略图 + formatted_urls.append({"url": u, "thumb": u}) + else: + # 如果是视频类型,提供默认预览图 (此处使用一个公共视频占位图或空) + if u.get('type') == 'video' and not u.get('thumb'): + u['thumb'] = "https://img.icons8.com/flat-round/64/000000/play--v1.png" + formatted_urls.append(u) + + history_list.append({ + "id": r.id, + "prompt": r.prompt, + "model": r.model, + "urls": formatted_urls, + "created_at": r.created_at_bj.strftime('%m-%d %H:%M') + }) + + return { + "history": history_list, + "has_next": pagination.has_next, + "total": pagination.total + } diff --git a/services/logger.py b/services/logger.py index c4b5d3e..062e311 100644 --- a/services/logger.py +++ b/services/logger.py @@ -1,6 +1,6 @@ import logging import os -from datetime import datetime +from datetime import datetime, timedelta from logging.handlers import RotatingFileHandler from extensions import redis_client, db import json @@ -42,13 +42,14 @@ class SystemLogger: def _push_to_redis(self, level, message, extra=None): """推送到 Redis 并保留 30 天数据""" try: - now = datetime.now() + now = datetime.utcnow() + bj_now = now + timedelta(hours=8) user_id = None if has_request_context(): user_id = g.get('user_id') or (getattr(g, 'user', None).id if hasattr(g, 'user') and g.user else None) log_entry = { - "time": now.strftime('%Y-%m-%d %H:%M:%S'), + "time": bj_now.strftime('%Y-%m-%d %H:%M:%S'), "level": level, "message": message, "user_id": user_id, @@ -75,7 +76,7 @@ class SystemLogger: 'module': module, 'user_id': extra.get('user_id') if extra else None, 'extra': json.dumps(extra, ensure_ascii=False) if extra else None, - 'created_at': datetime.now() + 'created_at': datetime.utcnow() } # 捕获请求上下文信息 diff --git a/services/stats_service.py b/services/stats_service.py new file mode 100644 index 0000000..a56d26c --- /dev/null +++ b/services/stats_service.py @@ -0,0 +1,65 @@ +from models import GenerationRecord, Order, db, to_bj_time +from sqlalchemy import func +from datetime import datetime, timedelta + +def get_point_stats(user_id, days=7): + """获取用户积分消耗统计数据 (用于图表)""" + end_date = datetime.utcnow() + start_date = end_date - timedelta(days=days-1) + + # 1. 获取消耗统计 (从 GenerationRecord) + # 按天分组汇总结算 + deductions = db.session.query( + func.date(GenerationRecord.created_at).label('date'), + func.sum(GenerationRecord.cost).label('total_cost') + ).filter( + GenerationRecord.user_id == user_id, + GenerationRecord.created_at >= start_date.date() + ).group_by(func.date(GenerationRecord.created_at)).all() + + # 2. 获取充值统计 (从 Order) + incomes = db.session.query( + func.date(Order.created_at).label('date'), + func.sum(Order.points).label('total_points') + ).filter( + Order.user_id == user_id, + Order.status == 'PAID', + Order.created_at >= start_date.date() + ).group_by(func.date(Order.created_at)).all() + + # 3. 补齐日期,生成连续数据 + date_list = [(start_date + timedelta(days=i)).strftime('%m-%d') for i in range(days)] + deduction_map = {d.date.strftime('%m-%d'): int(d.total_cost or 0) for d in deductions} + income_map = {i.date.strftime('%m-%d'): int(i.total_points or 0) for i in incomes} + + return { + "labels": date_list, + "deductions": [deduction_map.get(d, 0) for d in date_list], + "incomes": [income_map.get(d, 0) for d in date_list] + } + +def get_point_details(user_id, page=1, per_page=20): + """获取积分变动明细列表""" + pagination = GenerationRecord.query.filter( + GenerationRecord.user_id == user_id, + GenerationRecord.cost > 0 + ).order_by(GenerationRecord.created_at.desc()).paginate( + page=page, per_page=per_page, error_out=False + ) + + details = [] + for r in pagination.items: + details.append({ + "type": "deduction", + "desc": r.prompt[:30] + "..." if r.prompt else "AI 生成", + "model": r.model, + "change": f"-{r.cost}", + "time": r.created_at_bj.strftime('%Y-%m-%d %H:%M') + }) + + return { + "items": details, + "total": pagination.total, + "pages": pagination.pages, + "current_page": pagination.page + } diff --git a/services/system_service.py b/services/system_service.py new file mode 100644 index 0000000..f2d4a61 --- /dev/null +++ b/services/system_service.py @@ -0,0 +1,62 @@ +from models import SystemDict, SystemNotification, db +from flask import jsonify + +def get_system_config_data(): + """获取系统配置数据的业务逻辑""" + dicts = SystemDict.query.filter_by(is_active=True).order_by(SystemDict.sort_order.desc()).all() + + config = { + "models": [], + "ratios": [], + "prompts": [], + "sizes": [], + "video_models": [], + "video_prompts": [] + } + + for d in dicts: + item = {"label": d.label, "value": d.value} + if d.dict_type == 'ai_model': + item["cost"] = d.cost + config["models"].append(item) + elif d.dict_type == 'aspect_ratio': + config["ratios"].append(item) + elif d.dict_type == 'prompt_tpl': + config["prompts"].append(item) + elif d.dict_type == 'ai_image_size': + config["sizes"].append(item) + elif d.dict_type == 'video_model': + item["cost"] = d.cost + config["video_models"].append(item) + elif d.dict_type == 'video_prompt': + config["video_prompts"].append(item) + + return config + +def get_user_latest_notification(user_id): + """获取用户最近一条未读通知""" + latest = SystemNotification.query.filter_by(is_active=True)\ + .filter(~SystemNotification.read_by_users.any(id=user_id))\ + .order_by(SystemNotification.created_at.desc()).first() + + if latest: + return { + "id": latest.id, + "title": latest.title, + "content": latest.content, + "time": latest.created_at_bj.strftime('%Y-%m-%d %H:%M') + } + return {"id": None} + +def mark_notification_as_read(user_id, notif_id): + """标记通知已读""" + from models import User + + notif = db.session.get(SystemNotification, notif_id) + user = db.session.get(User, user_id) + + if notif and user: + if user not in notif.read_by_users: + notif.read_by_users.append(user) + db.session.commit() + return True diff --git a/services/task_service.py b/services/task_service.py new file mode 100644 index 0000000..e873356 --- /dev/null +++ b/services/task_service.py @@ -0,0 +1,285 @@ +import os +import uuid +import json +import requests +import io +import time +import base64 +import threading +from urllib.parse import quote +from PIL import Image + +from extensions import s3_client, redis_client, db +from models import GenerationRecord, User +from config import Config +from services.logger import system_logger + +def sync_images_background(app, record_id, raw_urls): + """后台同步图片至 MinIO,并生成缩略图,带重试机制""" + with app.app_context(): + processed_data = [] + for raw_url in raw_urls: + success = False + for attempt in range(3): # 3 次重试机制 + try: + img_resp = requests.get(raw_url, timeout=30) + if img_resp.status_code == 200: + content = img_resp.content + ext = ".png" + base_filename = f"gen-{uuid.uuid4().hex}" + full_filename = f"{base_filename}{ext}" + thumb_filename = f"{base_filename}-thumb{ext}" + + # 1. 上传原图 + s3_client.upload_fileobj( + io.BytesIO(content), + Config.MINIO["bucket"], + full_filename, + ExtraArgs={"ContentType": "image/png"} + ) + + full_url = f"{Config.MINIO['public_url']}{quote(full_filename)}" + thumb_url = full_url # 默认使用原图 + + # 2. 生成并上传缩略图 (400px 宽度) + try: + img = Image.open(io.BytesIO(content)) + # 转换为 RGB 如果是 RGBA (避免某些格式保存问题) + if img.mode in ("RGBA", "P"): + img = img.convert("RGB") + + # 缩放至宽度 400, 高度等比 + w, h = img.size + if w > 400: + ratio = 400 / float(w) + img.thumbnail((400, int(h * ratio)), Image.Resampling.LANCZOS) + + thumb_io = io.BytesIO() + # 缩略图保存为 JPEG 以获得更小的体积 + img.save(thumb_io, format='JPEG', quality=80, optimize=True) + thumb_io.seek(0) + + s3_client.upload_fileobj( + thumb_io, + Config.MINIO["bucket"], + thumb_filename.replace('.png', '.jpg'), + ExtraArgs={"ContentType": "image/jpeg"} + ) + thumb_url = f"{Config.MINIO['public_url']}{quote(thumb_filename.replace('.png', '.jpg'))}" + except Exception as thumb_e: + print(f"⚠️ 缩略图生成失败: {thumb_e}") + + processed_data.append({"url": full_url, "thumb": thumb_url}) + success = True + break + except Exception as e: + print(f"⚠️ 第 {attempt+1} 次同步失败: {e}") + time.sleep(2 ** attempt) # 指数退避 + + if not success: + # 如果最终失败,保留原始 URL + processed_data.append({"url": raw_url, "thumb": raw_url}) + + # 更新数据库记录为持久化数据结构 + try: + record = db.session.get(GenerationRecord, record_id) + if record: + record.image_urls = json.dumps(processed_data) + db.session.commit() + print(f"✅ 记录 {record_id} 图片及缩略图已完成同步") + except Exception as e: + print(f"❌ 更新记录失败: {e}") + +def process_image_generation(app, user_id, task_id, payload, api_key, target_api, cost): + """异步执行图片生成并存入 Redis""" + with app.app_context(): + try: + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + # 使用较长的超时时间 (10分钟),确保长耗时任务不被中断 + resp = requests.post(target_api, json=payload, headers=headers, timeout=1000) + + if resp.status_code != 200: + user = db.session.get(User, user_id) + if user and "sk-" in api_key: + user.points += cost + db.session.commit() + + # 记录详细的失败上下文 + system_logger.error(f"生图任务失败: {resp.text}", user_id=user_id, task_id=task_id, prompt=payload.get('prompt'), model=payload.get('model')) + redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "error", "message": resp.text})) + return + + api_result = resp.json() + raw_urls = [item['url'] for item in api_result.get('data', [])] + + # 持久化记录 + new_record = GenerationRecord( + user_id=user_id, + prompt=payload.get('prompt'), + model=payload.get('model'), + cost=cost, + image_urls=json.dumps(raw_urls) + ) + db.session.add(new_record) + db.session.commit() + + # 后台线程处理:下载 AI 原始图片并同步到私有 MinIO + threading.Thread( + target=sync_images_background, + args=(app, new_record.id, raw_urls) + ).start() + + # 存入 Redis 标记完成 + system_logger.info(f"生图任务完成", user_id=user_id, task_id=task_id, model=payload.get('model')) + redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "complete", "urls": raw_urls})) + + except Exception as e: + # 异常处理:退还积分 + user = db.session.get(User, user_id) + if user and "sk-" in api_key: + user.points += cost + db.session.commit() + + system_logger.error(f"生图任务异常: {str(e)}", user_id=user_id, task_id=task_id, prompt=payload.get('prompt'), model=payload.get('model')) + redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "error", "message": str(e)})) + +def sync_video_background(app, record_id, raw_url, internal_task_id=None): + """后台同步视频至 MinIO,带重试机制""" + with app.app_context(): + success = False + final_url = raw_url + for attempt in range(3): + try: + # 增加了流式下载,处理大视频文件 + with requests.get(raw_url, stream=True, timeout=120) as r: + r.raise_for_status() + content_type = r.headers.get('content-type', 'video/mp4') + ext = ".mp4" + if "text/html" in content_type: # 有些 API 返回的是跳转页面 + continue + + base_filename = f"video-{uuid.uuid4().hex}" + full_filename = f"{base_filename}{ext}" + + video_io = io.BytesIO() + for chunk in r.iter_content(chunk_size=8192): + video_io.write(chunk) + video_io.seek(0) + + # 上传至 MinIO + s3_client.upload_fileobj( + video_io, + Config.MINIO["bucket"], + full_filename, + ExtraArgs={"ContentType": content_type} + ) + + final_url = f"{Config.MINIO['public_url']}{quote(full_filename)}" + success = True + break + except Exception as e: + system_logger.error(f"同步视频失败 (第{attempt+1}次): {str(e)}") + time.sleep(5) + + if success: + try: + record = db.session.get(GenerationRecord, record_id) + if record: + # 更新记录为 MinIO 的 URL + record.image_urls = json.dumps([{"url": final_url, "type": "video"}]) + db.session.commit() + + # 同步更新 Redis 中的缓存 + if internal_task_id: + cached_data = redis_client.get(f"task:{internal_task_id}") + if cached_data: + if isinstance(cached_data, bytes): + cached_data = cached_data.decode('utf-8') + task_info = json.loads(cached_data) + task_info['video_url'] = final_url + redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps(task_info)) + + system_logger.info(f"视频同步 MinIO 成功", video_url=final_url) + except Exception as dbe: + system_logger.error(f"更新视频记录失败: {str(dbe)}") + +def process_video_generation(app, user_id, internal_task_id, payload, api_key, cost): + """异步提交并查询视频任务状态""" + with app.app_context(): + try: + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + # 1. 提交任务 + submit_resp = requests.post(Config.VIDEO_GEN_API, json=payload, headers=headers, timeout=60) + if submit_resp.status_code != 200: + raise Exception(f"视频任务提交失败: {submit_resp.text}") + + submit_result = submit_resp.json() + remote_task_id = submit_result.get('task_id') + if not remote_task_id: + raise Exception(f"未获取到远程任务 ID: {submit_result}") + + # 2. 轮询状态 + max_retries = 90 # 提升到 15 分钟 + video_url = None + for i in range(max_retries): + time.sleep(10) + poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id) + poll_resp = requests.get(poll_url, headers=headers, timeout=30) + if poll_resp.status_code != 200: + continue + + poll_result = poll_resp.json() + status = poll_result.get('status', '').upper() + + if status == 'SUCCESS': + # 提取视频输出地址 + if 'data' in poll_result and isinstance(poll_result['data'], dict): + video_url = poll_result['data'].get('output') + if not video_url: + if 'data' in poll_result and isinstance(poll_result['data'], list) and poll_result['data']: + video_url = poll_result['data'][0].get('url') + elif 'video' in poll_result: + video_url = poll_result['video'].get('url') if isinstance(poll_result['video'], dict) else poll_result['video'] + elif 'url' in poll_result: + video_url = poll_result['url'] + break + elif status in ['FAILED', 'ERROR']: + raise Exception(f"视频生成失败: {poll_result.get('fail_reason') or poll_result.get('message') or '未知错误'}") + + if not video_url: + raise Exception("超时未获取到视频地址") + + # 3. 持久化记录 + new_record = GenerationRecord( + user_id=user_id, + prompt=payload.get('prompt'), + model=payload.get('model'), + cost=cost, + image_urls=json.dumps([{"url": video_url, "type": "video"}]) + ) + db.session.add(new_record) + db.session.commit() + + # 后台线程异步同步到 MinIO + threading.Thread( + target=sync_video_background, + args=(app, new_record.id, video_url, internal_task_id) + ).start() + + # 4. 存入 Redis + redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({"status": "complete", "video_url": video_url, "record_id": new_record.id})) + system_logger.info(f"视频生成任务完成", user_id=user_id, task_id=internal_task_id) + + except Exception as e: + system_logger.error(f"视频生成执行异常: {str(e)}", user_id=user_id, task_id=internal_task_id, prompt=payload.get('prompt')) + # 尝试退费 + try: + user = db.session.get(User, user_id) + if user: + user.points += cost + db.session.commit() + except Exception as re: + system_logger.error(f"退费失败: {str(re)}") + + # 确保 Redis 状态一定被更新,防止前端死循环 + redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({"status": "error", "message": str(e)})) diff --git a/static/js/main.js b/static/js/main.js index be58315..2093565 100644 --- a/static/js/main.js +++ b/static/js/main.js @@ -137,7 +137,7 @@ async function loadHistory(isLoadMore = false) { footer.classList.remove('hidden'); try { - const r = await fetch(`/api/history?page=${currentHistoryPage}&per_page=10`); + const r = await fetch(`/api/history?page=${currentHistoryPage}&per_page=10&filter_type=image`); const d = await r.json(); const list = document.getElementById('historyList'); @@ -146,7 +146,7 @@ async function loadHistory(isLoadMore = false) { const html = d.history.map(item => `
- ${item.time} + ${item.created_at} ${item.model}
@@ -735,3 +735,131 @@ window.addEventListener('message', (e) => { closeVisualizerModal(); } }); + +// --- 积分与钱包中心逻辑 --- +let pointsChart = null; + +async function openPointsModal() { + const modal = document.getElementById('pointsModal'); + if (!modal) return; + modal.classList.remove('hidden'); + setTimeout(() => { + modal.classList.add('opacity-100'); + modal.querySelector('div').classList.remove('scale-95'); + }, 10); + + // 加载数据 + loadPointStats(); + loadPointDetails(); + + // 更新当前余额 + const r = await fetch('/api/auth/me'); + const d = await r.json(); + if (d.logged_in) { + document.getElementById('modalPointsDisplay').innerText = d.points; + } +} + +function closePointsModal() { + const modal = document.getElementById('pointsModal'); + modal.classList.remove('opacity-100'); + modal.querySelector('div').classList.add('scale-95'); + setTimeout(() => modal.classList.add('hidden'), 300); +} + +async function loadPointStats() { + const r = await fetch('/api/stats/points?days=7'); + const d = await r.json(); + + const canvas = document.getElementById('pointsChart'); + if (!canvas) return; + const ctx = canvas.getContext('2d'); + + if (pointsChart) pointsChart.destroy(); + + if (typeof Chart === 'undefined') { + console.error('Chart.js not loaded'); + return; + } + + pointsChart = new Chart(ctx, { + type: 'line', + data: { + labels: d.labels, + datasets: [ + { + label: '消耗积分', + data: d.deductions, + borderColor: '#6366f1', + backgroundColor: 'rgba(99, 102, 241, 0.1)', + borderWidth: 3, + fill: true, + tension: 0.4, + pointRadius: 4, + pointBackgroundColor: '#6366f1' + }, + { + label: '充值积分', + data: d.incomes, + borderColor: '#10b981', + backgroundColor: 'rgba(16, 185, 129, 0.1)', + borderWidth: 3, + fill: true, + tension: 0.4, + pointRadius: 4, + pointBackgroundColor: '#10b981' + } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + plugins: { + legend: { display: false } + }, + scales: { + y: { + beginAtZero: true, + grid: { color: 'rgba(241, 245, 249, 1)' }, + ticks: { font: { weight: 'bold' } } + }, + x: { + grid: { display: false }, + ticks: { font: { weight: 'bold' } } + } + } + } + }); +} + +async function loadPointDetails() { + const body = document.getElementById('pointDetailsBody'); + if (!body) return; + body.innerHTML = '正在加载明细...'; + + try { + const r = await fetch('/api/stats/details?page=1'); + const d = await r.json(); + + if (d.items.length === 0) { + body.innerHTML = '暂无积分变动记录'; + return; + } + + body.innerHTML = d.items.map(item => ` + + +
+
+ ${item.desc} +
+ + ${item.model} + ${item.change} + ${item.time} + + `).join(''); + } catch (e) { + body.innerHTML = '加载失败'; + } +} diff --git a/static/js/video.js b/static/js/video.js index 6b42487..d6faf24 100644 --- a/static/js/video.js +++ b/static/js/video.js @@ -54,14 +54,11 @@ document.addEventListener('DOMContentLoaded', () => { if (isLoadingHistory) return; isLoadingHistory = true; try { - const r = await fetch(`/api/history?page=${page}&per_page=10`); + const r = await fetch(`/api/history?page=${page}&per_page=10&filter_type=video`); const d = await r.json(); - // 过滤出有视频的记录 - const videoRecords = d.history.filter(item => { - const urls = item.urls || []; - return urls.some(u => u.type === 'video' || (typeof u === 'string' && u.endsWith('.mp4'))); - }); + // 服务端已完成过滤 + const videoRecords = d.history; if (videoRecords.length > 0) { const html = videoRecords.map(item => { diff --git a/sync_db.py b/sync_db.py deleted file mode 100644 index c5a66a2..0000000 --- a/sync_db.py +++ /dev/null @@ -1,15 +0,0 @@ -from app import app -from extensions import db -import models - -def init(): - with app.app_context(): - print("🔧 正在同步数据库架构...") - try: - db.create_all() - print("✅ 数据库表已成功创建或已存在") - except Exception as e: - print(f"❌ 同步失败: {e}") - -if __name__ == '__main__': - init() diff --git a/sync_history_db.py b/sync_history_db.py deleted file mode 100644 index dc588c9..0000000 --- a/sync_history_db.py +++ /dev/null @@ -1,10 +0,0 @@ -from app import app -from extensions import db -from models import GenerationRecord - -with app.app_context(): - try: - db.create_all() - print("✅ 数据库表同步成功 (包括 GenerationRecord)") - except Exception as e: - print(f"❌ 同步失败: {e}") diff --git a/sync_videos_manual.py b/sync_videos_manual.py deleted file mode 100644 index 9fa4d9a..0000000 --- a/sync_videos_manual.py +++ /dev/null @@ -1,83 +0,0 @@ -import json -import io -import requests -import uuid -import time -from urllib.parse import quote -from app import create_app -from extensions import db, s3_client -from config import Config -from models import GenerationRecord - -app = create_app() - -def sync_old_videos(): - with app.app_context(): - print("🔍 开始扫描未同步的视频记录...") - - # 获取所有包含 'video' 字样的记录 (简单过滤) - records = GenerationRecord.query.filter(GenerationRecord.image_urls.like('%video%')).all() - - count = 0 - success_count = 0 - - for r in records: - try: - data = json.loads(r.image_urls) - updated = False - new_data = [] - - for item in data: - # 检查是否是视频且 URL 不是 MinIO 的地址 - if isinstance(item, dict) and item.get('type') == 'video': - url = item.get('url') - if url and Config.MINIO['public_url'] not in url: - print(f"⏳ 正在同步记录 {r.id}: {url[:50]}...") - - # 尝试下载并转存 - try: - with requests.get(url, stream=True, timeout=60) as req: - if req.status_code == 200: - content_type = req.headers.get('content-type', 'video/mp4') - ext = ".mp4" - - base_filename = f"video-{uuid.uuid4().hex}" - full_filename = f"{base_filename}{ext}" - - video_io = io.BytesIO() - for chunk in req.iter_content(chunk_size=8192): - video_io.write(chunk) - video_io.seek(0) - - # 上传至 MinIO - s3_client.upload_fileobj( - video_io, - Config.MINIO["bucket"], - full_filename, - ExtraArgs={"ContentType": content_type} - ) - - final_url = f"{Config.MINIO['public_url']}{quote(full_filename)}" - item['url'] = final_url - updated = True - print(f"✅ 同步成功: {final_url}") - else: - print(f"❌ 下载失败 (Status {req.status_code}),可能链接已过期") - except Exception as e: - print(f"❌ 同步异常: {e}") - - new_data.append(item) - - if updated: - r.image_urls = json.dumps(new_data) - db.session.commit() - success_count += 1 - count += 1 - - except Exception as e: - print(f"处理记录 {r.id} 出错: {e}") - - print(f"\n🎉 扫描完成! 成功同步了 {success_count} 个视频。") - -if __name__ == "__main__": - sync_old_videos() diff --git a/templates/base.html b/templates/base.html index 28ae2c9..124b4ae 100644 --- a/templates/base.html +++ b/templates/base.html @@ -12,6 +12,7 @@ rel="stylesheet"> +