From fec3426bad3800b36e25880f4ed67ca0325baa1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E5=8F=B8git?= <240241002@qq.com> Date: Wed, 22 Apr 2026 11:53:00 +0800 Subject: [PATCH] =?UTF-8?q?```=20feat(config):=20=E6=B7=BB=E5=8A=A0API?= =?UTF-8?q?=E5=A4=87=E7=94=A8=E5=9C=B0=E5=9D=80=E5=92=8C=E5=AF=86=E9=92=A5?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 AI_BASE_URL 和 AI_BACKUP_BASE_URL 配置项 - 为试用、高级、视频和Gemini Flash等API添加备用密钥 - 使用f-string格式化API端点URL以支持动态基地址 - 统一API端点构建方式,提高配置灵活性 feat(services): 实现API故障自动切换机制 - 在聊天生成服务中集成多线路候选和故障转移逻辑 - 重构图像和视频生成服务以支持备用API线路 - 实现智能路由,根据状态码自动切换到备用线路 - 增强错误处理和日志记录功能 feat(utils): 新增API线路管理和切换工具函数 - 实现 get_backup_api_url() 函数用于URL备用地址转换 - 创建 get_backup_api_key() 函数管理备用密钥映射 - 开发 get_api_candidates() 函数生成主备线路候选列表 - 添加 should_switch_to_backup() 函数判断是否需要切换线路 refactor(services): 优化视频生成任务的API密钥配置 - 将视频生成任务独立使用VIDEO_KEY而非通用密钥 - 确保视频服务使用专门的API密钥进行身份验证 ``` --- config.py | 15 +++- services/generation_service.py | 49 +++++++++-- services/task_service.py | 149 ++++++++++++++++++++++++--------- utils.py | 54 +++++++++++- 4 files changed, 215 insertions(+), 52 deletions(-) diff --git a/config.py b/config.py index bb2f48e..225d6d9 100644 --- a/config.py +++ b/config.py @@ -32,15 +32,22 @@ class Config: # AI API 配置 AI_API = "https://ai.t8star.cn/v1/images/generations" - CHAT_API = "https://ai.comfly.chat/v1/chat/completions" - VIDEO_GEN_API = "https://ai.comfly.chat/v2/videos/generations" - VIDEO_POLL_API = "https://ai.comfly.chat/v2/videos/generations/{task_id}" + AI_BASE_URL = "https://ai.comfly.chat" + AI_BACKUP_BASE_URL = "https://api.bltcy.ai" + CHAT_API = f"{AI_BASE_URL}/v1/chat/completions" + VIDEO_GEN_API = f"{AI_BASE_URL}/v2/videos/generations" + VIDEO_POLL_API = f"{AI_BASE_URL}/v2/videos/generations/{{task_id}}" # 试用模式配置 - TRIAL_API = "https://ai.comfly.chat/v1/images/generations" + TRIAL_API = f"{AI_BASE_URL}/v1/images/generations" TRIAL_KEY = "sk-Rr8L5noW8Qga7K4jmey3yYZYL1a4SlhlNlo5iZrwqJRK1Pa1" + TRIAL_BACKUP_KEY = "sk-pxowRoSbyavisIbaVLdedS7g5UePzhIxsTfjlJFOuqTRYQzT" PREMIUM_KEY = "sk-168trRxnemem6nTpQn1rbmJ4SFKLwTMsZ0G6uk5OipP7FKAY" + PREMIUM_BACKUP_KEY = "sk-iMfeWAESLFIrHvxuBe2VbM4rC7ScZNvcaouqhVAd0J4KomYv" + VIDEO_KEY = "sk-KB1MIu8gUKn1QqvbG89gRjyRZoy04t17NkOj1uCbtJnYOXyS" + VIDEO_BACKUP_KEY = "sk-pxowRoSbyavisIbaVLdedS7g5UePzhIxsTfjlJFOuqTRYQzT" GEMINI_FLASH_IMAGE_PREMIUM_KEY = "sk-OEbEnJORrKx4YEnLPEbwQL3eS5sp0eeSbtUepUrsIqjaLc1X" + GEMINI_FLASH_IMAGE_PREMIUM_BACKUP_KEY = "sk-0vTMf4vJ0RMvLK4J5wtkRDu7pNEDD69piFcDyf31NCdEYTC1" DICT_URL = "https://nas.4x4g.com:10011/api/common/sys/dict" PLATFORM = "lingmao" diff --git a/services/generation_service.py b/services/generation_service.py index 40e181f..cf73317 100644 --- a/services/generation_service.py +++ b/services/generation_service.py @@ -8,7 +8,7 @@ import json import uuid import threading from flask import current_app -from utils import get_proxied_url +from utils import get_api_candidates, get_proxied_url, should_switch_to_backup def get_model_cost(model_value, is_video=False): """获取模型消耗积分""" @@ -92,17 +92,54 @@ def refund_points(user_id, cost): def handle_chat_generation_sync(user_id, api_key, model_value, prompt, use_trial, cost): """同步处理对话类模型""" - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} chat_payload = { "model": model_value, "messages": [{"role": "user", "content": prompt}] } try: - resp = requests.post(get_proxied_url(Config.CHAT_API), json=chat_payload, headers=headers, timeout=Config.PROXY_TIMEOUT_LONG) - if resp.status_code != 200: + resp = None + last_error = None + candidates = get_api_candidates(Config.CHAT_API, api_key) + + for idx, candidate in enumerate(candidates): + headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"} + try: + resp = requests.post( + get_proxied_url(candidate['url']), + json=chat_payload, + headers=headers, + timeout=Config.PROXY_TIMEOUT_LONG + ) + if resp.status_code == 200: + break + + last_error = resp.text + has_backup = idx < len(candidates) - 1 + if has_backup and should_switch_to_backup(resp.status_code): + system_logger.warning( + "聊天主线路失败,切换备用线路", + user_id=user_id, + model=model_value, + status_code=resp.status_code + ) + continue + break + except requests.RequestException as e: + last_error = str(e) + if idx < len(candidates) - 1: + system_logger.warning( + "聊天请求异常,切换备用线路", + user_id=user_id, + model=model_value, + error=last_error + ) + continue + raise + + if not resp or resp.status_code != 200: if use_trial: refund_points(user_id, cost) - return {"error": resp.text}, resp.status_code + return {"error": last_error or "聊天请求失败"}, resp.status_code if resp else 500 api_result = resp.json() content = api_result['choices'][0]['message']['content'] @@ -159,7 +196,7 @@ def validate_video_request(user, data): def start_async_video_task(app, user_id, payload, cost, model_value): """启动异步视频任务""" - api_key = Config.TRIAL_KEY + api_key = Config.VIDEO_KEY task_id = str(uuid.uuid4()) system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost) diff --git a/services/task_service.py b/services/task_service.py index 351de56..930ec6e 100644 --- a/services/task_service.py +++ b/services/task_service.py @@ -14,7 +14,7 @@ from extensions import s3_client, redis_client, db from models import GenerationRecord, User from config import Config from services.logger import system_logger -from utils import get_proxied_url +from utils import get_api_candidates, get_backup_api_url, get_proxied_url, should_switch_to_backup def _extract_error_detail(data, default="未知错误"): @@ -128,44 +128,67 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api # 更新状态为处理中 redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "processing", "message": "任务已提交,正在排队处理..."})) try: - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} - # 1. 提交异步请求 (带重试机制) submit_resp = None last_error = None + active_api = target_api + active_key = api_key + stop_fallback = False + candidates = get_api_candidates(target_api, api_key) - for attempt in range(3): - try: - # 添加 async=true 参数启用异步模式 - submit_resp = requests.post( - get_proxied_url(target_api), - params={"async": "true"}, - json=payload, - headers=headers, - timeout=Config.PROXY_TIMEOUT_DEFAULT - ) + for idx, candidate in enumerate(candidates): + headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"} + + for attempt in range(3): + try: + # 添加 async=true 参数启用异步模式 + submit_resp = requests.post( + get_proxied_url(candidate['url']), + params={"async": "true"}, + json=payload, + headers=headers, + timeout=Config.PROXY_TIMEOUT_DEFAULT + ) - if submit_resp.status_code == 200: - break # 成功 - else: - system_logger.warning(f"任务提交失败(第{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}", user_id=user_id, task_id=task_id) + if submit_resp.status_code == 200: + active_api = candidate['url'] + active_key = candidate['api_key'] + break + + system_logger.warning( + f"任务提交失败({candidate['label']} 第{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}", + user_id=user_id, + task_id=task_id + ) last_error = f"HTTP {submit_resp.status_code}: {submit_resp.text}" - except (ConnectTimeout, ConnectionError) as e: - # 连接阶段的错误(DNS失败、连接拒绝、连接超时),请求大概率未发出,可安全重试 - err_str = str(e) - if 'RemoteDisconnected' in err_str or 'ReadTimeout' in err_str: - # 连接建立后断开 = 请求可能已发送到上游,不要重试以免重复任务 - system_logger.warning(f"任务提交后响应丢失(第{attempt+1}次),不再重试: {err_str}", user_id=user_id, task_id=task_id) + + if should_switch_to_backup(submit_resp.status_code): + break + except (ConnectTimeout, ConnectionError) as e: + # 连接阶段的错误(DNS失败、连接拒绝、连接超时),请求大概率未发出,可安全重试 + err_str = str(e) + if 'RemoteDisconnected' in err_str or 'ReadTimeout' in err_str: + # 连接建立后断开 = 请求可能已发送到上游,不要重试以免重复任务 + system_logger.warning(f"任务提交后响应丢失(第{attempt+1}次),不再重试: {err_str}", user_id=user_id, task_id=task_id) + last_error = err_str + stop_fallback = True + break + system_logger.warning(f"任务提交连接异常({candidate['label']} 第{attempt+1}次): {err_str}", user_id=user_id, task_id=task_id) last_error = err_str + time.sleep(1) + except Exception as e: + # 其他未知异常,保守起见不重试 + system_logger.warning(f"任务提交异常({candidate['label']} 第{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id) + last_error = str(e) + stop_fallback = True break - system_logger.warning(f"任务提交连接异常(第{attempt+1}次): {err_str}", user_id=user_id, task_id=task_id) - last_error = err_str - time.sleep(1) - except Exception as e: - # 其他未知异常,保守起见不重试 - system_logger.warning(f"任务提交异常(第{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id) - last_error = str(e) + + if submit_resp and submit_resp.status_code == 200: break + if stop_fallback: + break + if idx < len(candidates) - 1: + system_logger.warning("主线路提交失败,切换备用线路继续尝试", user_id=user_id, task_id=task_id) if not submit_resp or submit_resp.status_code != 200: raise Exception(f"任务提交失败(重试3次后): {last_error}") @@ -179,10 +202,10 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api system_logger.info(f"外部异步任务已提交: {remote_task_id}", user_id=user_id, task_id=task_id) # 构造查询 URL: .../images/generations -> .../images/tasks/{task_id} - poll_url = target_api.replace('/generations', f'/tasks/{remote_task_id}') - if poll_url == target_api: # Fallback if replace failed + poll_url = active_api.replace('/generations', f'/tasks/{remote_task_id}') + if poll_url == active_api: # Fallback if replace failed import posixpath - base_url = posixpath.dirname(target_api) + base_url = posixpath.dirname(active_api) poll_url = f"{base_url}/tasks/{remote_task_id}" system_logger.info(f"开始轮询异步任务: {poll_url}", user_id=user_id, task_id=task_id) @@ -206,7 +229,8 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api })) try: - poll_resp = requests.get(get_proxied_url(poll_url), headers=headers, timeout=Config.PROXY_TIMEOUT_SHORT) + poll_headers = {"Authorization": f"Bearer {active_key}", "Content-Type": "application/json"} + poll_resp = requests.get(get_proxied_url(poll_url), headers=poll_headers, timeout=Config.PROXY_TIMEOUT_SHORT) if poll_resp.status_code != 200: system_logger.warning(f"轮询非 200: {poll_resp.status_code}", user_id=user_id, task_id=task_id) @@ -357,11 +381,51 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c """异步提交并查询视频任务状态""" with app.app_context(): try: - headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} # 1. 提交任务 - submit_resp = requests.post(get_proxied_url(Config.VIDEO_GEN_API), json=payload, headers=headers, timeout=Config.PROXY_TIMEOUT_DEFAULT) - if submit_resp.status_code != 200: - raise Exception(f"视频任务提交失败: {submit_resp.text}") + submit_resp = None + last_error = None + active_api = Config.VIDEO_GEN_API + active_key = api_key + candidates = get_api_candidates(Config.VIDEO_GEN_API, api_key) + + for idx, candidate in enumerate(candidates): + headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"} + try: + submit_resp = requests.post( + get_proxied_url(candidate['url']), + json=payload, + headers=headers, + timeout=Config.PROXY_TIMEOUT_DEFAULT + ) + if submit_resp.status_code == 200: + active_api = candidate['url'] + active_key = candidate['api_key'] + break + + last_error = submit_resp.text + if idx < len(candidates) - 1 and should_switch_to_backup(submit_resp.status_code): + system_logger.warning( + "视频主线路提交失败,切换备用线路", + user_id=user_id, + task_id=internal_task_id, + status_code=submit_resp.status_code + ) + continue + break + except requests.RequestException as e: + last_error = str(e) + if idx < len(candidates) - 1: + system_logger.warning( + "视频提交异常,切换备用线路", + user_id=user_id, + task_id=internal_task_id, + error=last_error + ) + continue + raise + + if not submit_resp or submit_resp.status_code != 200: + raise Exception(f"视频任务提交失败: {last_error or (submit_resp.text if submit_resp else '未知错误')}") submit_result = submit_resp.json() remote_task_id = submit_result.get('task_id') @@ -373,17 +437,20 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c max_retries = 90 # 提升到 15 分钟 video_url = None + status = "" for i in range(max_retries): # 更新进度 (伪进度或保持活跃) if i % 2 == 0: # 每20秒更新一次心跳,防止被认为是死任务 redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({ "status": "processing", "message": f"视频生成中 (已耗时 {i * 10} 秒)..." - })) + })) time.sleep(10) - poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id) - poll_resp = requests.get(get_proxied_url(poll_url), headers=headers, timeout=Config.PROXY_TIMEOUT_SHORT) + poll_template = Config.VIDEO_POLL_API if active_api == Config.VIDEO_GEN_API else get_backup_api_url(Config.VIDEO_POLL_API) + poll_url = poll_template.format(task_id=remote_task_id) + poll_headers = {"Authorization": f"Bearer {active_key}", "Content-Type": "application/json"} + poll_resp = requests.get(get_proxied_url(poll_url), headers=poll_headers, timeout=Config.PROXY_TIMEOUT_SHORT) if poll_resp.status_code != 200: continue diff --git a/utils.py b/utils.py index cf7c2ee..9881a52 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,4 @@ -from urllib.parse import quote +from urllib.parse import quote, urlparse, urlunparse from config import Config def get_proxied_url(url): @@ -9,3 +9,55 @@ def get_proxied_url(url): if Config.USE_PROXY and url: return f"{Config.PROXY_URL}{quote(url)}" return url + +def get_backup_api_url(url): + """ + 将已有接口地址切换到备用 AI Base URL,并保留原始路径。 + """ + if not url or not Config.AI_BACKUP_BASE_URL: + return url + + source = urlparse(url) + backup = urlparse(Config.AI_BACKUP_BASE_URL) + if not backup.scheme or not backup.netloc: + return url + + return urlunparse(( + backup.scheme, + backup.netloc, + source.path, + source.params, + source.query, + source.fragment, + )) + +def get_backup_api_key(api_key): + """ + 返回内置主 Key 对应的备用 Key;自定义 Key 默认沿用原值。 + """ + fallback_map = { + Config.TRIAL_KEY: Config.TRIAL_BACKUP_KEY, + Config.PREMIUM_KEY: Config.PREMIUM_BACKUP_KEY, + Config.VIDEO_KEY: Config.VIDEO_BACKUP_KEY, + Config.GEMINI_FLASH_IMAGE_PREMIUM_KEY: Config.GEMINI_FLASH_IMAGE_PREMIUM_BACKUP_KEY, + } + return fallback_map.get(api_key, api_key) + +def get_api_candidates(url, api_key): + """ + 生成主线路与备用线路候选。 + """ + candidates = [{"url": url, "api_key": api_key, "label": "primary"}] + backup_url = get_backup_api_url(url) + backup_key = get_backup_api_key(api_key) + + if backup_url and backup_url != url: + candidates.append({"url": backup_url, "api_key": backup_key, "label": "backup"}) + + return candidates + +def should_switch_to_backup(status_code): + """ + 遇到鉴权、限流或服务端异常时,尝试切换备用线路。 + """ + return status_code in (401, 403, 408, 429) or status_code >= 500