feat(config): 添加API备用地址和密钥配置

- 新增 AI_BASE_URL 和 AI_BACKUP_BASE_URL 配置项
- 为试用、高级、视频和Gemini Flash等API添加备用密钥
- 使用f-string格式化API端点URL以支持动态基地址
- 统一API端点构建方式,提高配置灵活性

feat(services): 实现API故障自动切换机制

- 在聊天生成服务中集成多线路候选和故障转移逻辑
- 重构图像和视频生成服务以支持备用API线路
- 实现智能路由,根据状态码自动切换到备用线路
- 增强错误处理和日志记录功能

feat(utils): 新增API线路管理和切换工具函数

- 实现 get_backup_api_url() 函数用于URL备用地址转换
- 创建 get_backup_api_key() 函数管理备用密钥映射
- 开发 get_api_candidates() 函数生成主备线路候选列表
- 添加 should_switch_to_backup() 函数判断是否需要切换线路

refactor(services): 优化视频生成任务的API密钥配置

- 将视频生成任务独立使用VIDEO_KEY而非通用密钥
- 确保视频服务使用专门的API密钥进行身份验证
```
This commit is contained in:
公司git 2026-04-22 11:53:00 +08:00
parent 99583289ba
commit fec3426bad
4 changed files with 215 additions and 52 deletions

View File

@ -32,15 +32,22 @@ class Config:
# AI API 配置 # AI API 配置
AI_API = "https://ai.t8star.cn/v1/images/generations" AI_API = "https://ai.t8star.cn/v1/images/generations"
CHAT_API = "https://ai.comfly.chat/v1/chat/completions" AI_BASE_URL = "https://ai.comfly.chat"
VIDEO_GEN_API = "https://ai.comfly.chat/v2/videos/generations" AI_BACKUP_BASE_URL = "https://api.bltcy.ai"
VIDEO_POLL_API = "https://ai.comfly.chat/v2/videos/generations/{task_id}" CHAT_API = f"{AI_BASE_URL}/v1/chat/completions"
VIDEO_GEN_API = f"{AI_BASE_URL}/v2/videos/generations"
VIDEO_POLL_API = f"{AI_BASE_URL}/v2/videos/generations/{{task_id}}"
# 试用模式配置 # 试用模式配置
TRIAL_API = "https://ai.comfly.chat/v1/images/generations" TRIAL_API = f"{AI_BASE_URL}/v1/images/generations"
TRIAL_KEY = "sk-Rr8L5noW8Qga7K4jmey3yYZYL1a4SlhlNlo5iZrwqJRK1Pa1" TRIAL_KEY = "sk-Rr8L5noW8Qga7K4jmey3yYZYL1a4SlhlNlo5iZrwqJRK1Pa1"
TRIAL_BACKUP_KEY = "sk-pxowRoSbyavisIbaVLdedS7g5UePzhIxsTfjlJFOuqTRYQzT"
PREMIUM_KEY = "sk-168trRxnemem6nTpQn1rbmJ4SFKLwTMsZ0G6uk5OipP7FKAY" PREMIUM_KEY = "sk-168trRxnemem6nTpQn1rbmJ4SFKLwTMsZ0G6uk5OipP7FKAY"
PREMIUM_BACKUP_KEY = "sk-iMfeWAESLFIrHvxuBe2VbM4rC7ScZNvcaouqhVAd0J4KomYv"
VIDEO_KEY = "sk-KB1MIu8gUKn1QqvbG89gRjyRZoy04t17NkOj1uCbtJnYOXyS"
VIDEO_BACKUP_KEY = "sk-pxowRoSbyavisIbaVLdedS7g5UePzhIxsTfjlJFOuqTRYQzT"
GEMINI_FLASH_IMAGE_PREMIUM_KEY = "sk-OEbEnJORrKx4YEnLPEbwQL3eS5sp0eeSbtUepUrsIqjaLc1X" GEMINI_FLASH_IMAGE_PREMIUM_KEY = "sk-OEbEnJORrKx4YEnLPEbwQL3eS5sp0eeSbtUepUrsIqjaLc1X"
GEMINI_FLASH_IMAGE_PREMIUM_BACKUP_KEY = "sk-0vTMf4vJ0RMvLK4J5wtkRDu7pNEDD69piFcDyf31NCdEYTC1"
DICT_URL = "https://nas.4x4g.com:10011/api/common/sys/dict" DICT_URL = "https://nas.4x4g.com:10011/api/common/sys/dict"
PLATFORM = "lingmao" PLATFORM = "lingmao"

View File

@ -8,7 +8,7 @@ import json
import uuid import uuid
import threading import threading
from flask import current_app from flask import current_app
from utils import get_proxied_url from utils import get_api_candidates, get_proxied_url, should_switch_to_backup
def get_model_cost(model_value, is_video=False): def get_model_cost(model_value, is_video=False):
"""获取模型消耗积分""" """获取模型消耗积分"""
@ -92,17 +92,54 @@ def refund_points(user_id, cost):
def handle_chat_generation_sync(user_id, api_key, model_value, prompt, use_trial, cost): def handle_chat_generation_sync(user_id, api_key, model_value, prompt, use_trial, cost):
"""同步处理对话类模型""" """同步处理对话类模型"""
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
chat_payload = { chat_payload = {
"model": model_value, "model": model_value,
"messages": [{"role": "user", "content": prompt}] "messages": [{"role": "user", "content": prompt}]
} }
try: try:
resp = requests.post(get_proxied_url(Config.CHAT_API), json=chat_payload, headers=headers, timeout=Config.PROXY_TIMEOUT_LONG) resp = None
if resp.status_code != 200: last_error = None
candidates = get_api_candidates(Config.CHAT_API, api_key)
for idx, candidate in enumerate(candidates):
headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"}
try:
resp = requests.post(
get_proxied_url(candidate['url']),
json=chat_payload,
headers=headers,
timeout=Config.PROXY_TIMEOUT_LONG
)
if resp.status_code == 200:
break
last_error = resp.text
has_backup = idx < len(candidates) - 1
if has_backup and should_switch_to_backup(resp.status_code):
system_logger.warning(
"聊天主线路失败,切换备用线路",
user_id=user_id,
model=model_value,
status_code=resp.status_code
)
continue
break
except requests.RequestException as e:
last_error = str(e)
if idx < len(candidates) - 1:
system_logger.warning(
"聊天请求异常,切换备用线路",
user_id=user_id,
model=model_value,
error=last_error
)
continue
raise
if not resp or resp.status_code != 200:
if use_trial: if use_trial:
refund_points(user_id, cost) refund_points(user_id, cost)
return {"error": resp.text}, resp.status_code return {"error": last_error or "聊天请求失败"}, resp.status_code if resp else 500
api_result = resp.json() api_result = resp.json()
content = api_result['choices'][0]['message']['content'] content = api_result['choices'][0]['message']['content']
@ -159,7 +196,7 @@ def validate_video_request(user, data):
def start_async_video_task(app, user_id, payload, cost, model_value): def start_async_video_task(app, user_id, payload, cost, model_value):
"""启动异步视频任务""" """启动异步视频任务"""
api_key = Config.TRIAL_KEY api_key = Config.VIDEO_KEY
task_id = str(uuid.uuid4()) task_id = str(uuid.uuid4())
system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost) system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost)

View File

@ -14,7 +14,7 @@ from extensions import s3_client, redis_client, db
from models import GenerationRecord, User from models import GenerationRecord, User
from config import Config from config import Config
from services.logger import system_logger from services.logger import system_logger
from utils import get_proxied_url from utils import get_api_candidates, get_backup_api_url, get_proxied_url, should_switch_to_backup
def _extract_error_detail(data, default="未知错误"): def _extract_error_detail(data, default="未知错误"):
@ -128,44 +128,67 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api
# 更新状态为处理中 # 更新状态为处理中
redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "processing", "message": "任务已提交,正在排队处理..."})) redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "processing", "message": "任务已提交,正在排队处理..."}))
try: try:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
# 1. 提交异步请求 (带重试机制) # 1. 提交异步请求 (带重试机制)
submit_resp = None submit_resp = None
last_error = None last_error = None
active_api = target_api
active_key = api_key
stop_fallback = False
candidates = get_api_candidates(target_api, api_key)
for attempt in range(3): for idx, candidate in enumerate(candidates):
try: headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"}
# 添加 async=true 参数启用异步模式
submit_resp = requests.post( for attempt in range(3):
get_proxied_url(target_api), try:
params={"async": "true"}, # 添加 async=true 参数启用异步模式
json=payload, submit_resp = requests.post(
headers=headers, get_proxied_url(candidate['url']),
timeout=Config.PROXY_TIMEOUT_DEFAULT params={"async": "true"},
) json=payload,
headers=headers,
timeout=Config.PROXY_TIMEOUT_DEFAULT
)
if submit_resp.status_code == 200: if submit_resp.status_code == 200:
break # 成功 active_api = candidate['url']
else: active_key = candidate['api_key']
system_logger.warning(f"任务提交失败(第{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}", user_id=user_id, task_id=task_id) break
system_logger.warning(
f"任务提交失败({candidate['label']}{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}",
user_id=user_id,
task_id=task_id
)
last_error = f"HTTP {submit_resp.status_code}: {submit_resp.text}" last_error = f"HTTP {submit_resp.status_code}: {submit_resp.text}"
except (ConnectTimeout, ConnectionError) as e:
# 连接阶段的错误(DNS失败、连接拒绝、连接超时),请求大概率未发出,可安全重试 if should_switch_to_backup(submit_resp.status_code):
err_str = str(e) break
if 'RemoteDisconnected' in err_str or 'ReadTimeout' in err_str: except (ConnectTimeout, ConnectionError) as e:
# 连接建立后断开 = 请求可能已发送到上游,不要重试以免重复任务 # 连接阶段的错误(DNS失败、连接拒绝、连接超时),请求大概率未发出,可安全重试
system_logger.warning(f"任务提交后响应丢失(第{attempt+1}次),不再重试: {err_str}", user_id=user_id, task_id=task_id) err_str = str(e)
if 'RemoteDisconnected' in err_str or 'ReadTimeout' in err_str:
# 连接建立后断开 = 请求可能已发送到上游,不要重试以免重复任务
system_logger.warning(f"任务提交后响应丢失(第{attempt+1}次),不再重试: {err_str}", user_id=user_id, task_id=task_id)
last_error = err_str
stop_fallback = True
break
system_logger.warning(f"任务提交连接异常({candidate['label']}{attempt+1}次): {err_str}", user_id=user_id, task_id=task_id)
last_error = err_str last_error = err_str
time.sleep(1)
except Exception as e:
# 其他未知异常,保守起见不重试
system_logger.warning(f"任务提交异常({candidate['label']}{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id)
last_error = str(e)
stop_fallback = True
break break
system_logger.warning(f"任务提交连接异常(第{attempt+1}次): {err_str}", user_id=user_id, task_id=task_id)
last_error = err_str if submit_resp and submit_resp.status_code == 200:
time.sleep(1)
except Exception as e:
# 其他未知异常,保守起见不重试
system_logger.warning(f"任务提交异常(第{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id)
last_error = str(e)
break break
if stop_fallback:
break
if idx < len(candidates) - 1:
system_logger.warning("主线路提交失败,切换备用线路继续尝试", user_id=user_id, task_id=task_id)
if not submit_resp or submit_resp.status_code != 200: if not submit_resp or submit_resp.status_code != 200:
raise Exception(f"任务提交失败(重试3次后): {last_error}") raise Exception(f"任务提交失败(重试3次后): {last_error}")
@ -179,10 +202,10 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api
system_logger.info(f"外部异步任务已提交: {remote_task_id}", user_id=user_id, task_id=task_id) system_logger.info(f"外部异步任务已提交: {remote_task_id}", user_id=user_id, task_id=task_id)
# 构造查询 URL: .../images/generations -> .../images/tasks/{task_id} # 构造查询 URL: .../images/generations -> .../images/tasks/{task_id}
poll_url = target_api.replace('/generations', f'/tasks/{remote_task_id}') poll_url = active_api.replace('/generations', f'/tasks/{remote_task_id}')
if poll_url == target_api: # Fallback if replace failed if poll_url == active_api: # Fallback if replace failed
import posixpath import posixpath
base_url = posixpath.dirname(target_api) base_url = posixpath.dirname(active_api)
poll_url = f"{base_url}/tasks/{remote_task_id}" poll_url = f"{base_url}/tasks/{remote_task_id}"
system_logger.info(f"开始轮询异步任务: {poll_url}", user_id=user_id, task_id=task_id) system_logger.info(f"开始轮询异步任务: {poll_url}", user_id=user_id, task_id=task_id)
@ -206,7 +229,8 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api
})) }))
try: try:
poll_resp = requests.get(get_proxied_url(poll_url), headers=headers, timeout=Config.PROXY_TIMEOUT_SHORT) poll_headers = {"Authorization": f"Bearer {active_key}", "Content-Type": "application/json"}
poll_resp = requests.get(get_proxied_url(poll_url), headers=poll_headers, timeout=Config.PROXY_TIMEOUT_SHORT)
if poll_resp.status_code != 200: if poll_resp.status_code != 200:
system_logger.warning(f"轮询非 200: {poll_resp.status_code}", user_id=user_id, task_id=task_id) system_logger.warning(f"轮询非 200: {poll_resp.status_code}", user_id=user_id, task_id=task_id)
@ -357,11 +381,51 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c
"""异步提交并查询视频任务状态""" """异步提交并查询视频任务状态"""
with app.app_context(): with app.app_context():
try: try:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
# 1. 提交任务 # 1. 提交任务
submit_resp = requests.post(get_proxied_url(Config.VIDEO_GEN_API), json=payload, headers=headers, timeout=Config.PROXY_TIMEOUT_DEFAULT) submit_resp = None
if submit_resp.status_code != 200: last_error = None
raise Exception(f"视频任务提交失败: {submit_resp.text}") active_api = Config.VIDEO_GEN_API
active_key = api_key
candidates = get_api_candidates(Config.VIDEO_GEN_API, api_key)
for idx, candidate in enumerate(candidates):
headers = {"Authorization": f"Bearer {candidate['api_key']}", "Content-Type": "application/json"}
try:
submit_resp = requests.post(
get_proxied_url(candidate['url']),
json=payload,
headers=headers,
timeout=Config.PROXY_TIMEOUT_DEFAULT
)
if submit_resp.status_code == 200:
active_api = candidate['url']
active_key = candidate['api_key']
break
last_error = submit_resp.text
if idx < len(candidates) - 1 and should_switch_to_backup(submit_resp.status_code):
system_logger.warning(
"视频主线路提交失败,切换备用线路",
user_id=user_id,
task_id=internal_task_id,
status_code=submit_resp.status_code
)
continue
break
except requests.RequestException as e:
last_error = str(e)
if idx < len(candidates) - 1:
system_logger.warning(
"视频提交异常,切换备用线路",
user_id=user_id,
task_id=internal_task_id,
error=last_error
)
continue
raise
if not submit_resp or submit_resp.status_code != 200:
raise Exception(f"视频任务提交失败: {last_error or (submit_resp.text if submit_resp else '未知错误')}")
submit_result = submit_resp.json() submit_result = submit_resp.json()
remote_task_id = submit_result.get('task_id') remote_task_id = submit_result.get('task_id')
@ -373,17 +437,20 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c
max_retries = 90 # 提升到 15 分钟 max_retries = 90 # 提升到 15 分钟
video_url = None video_url = None
status = ""
for i in range(max_retries): for i in range(max_retries):
# 更新进度 (伪进度或保持活跃) # 更新进度 (伪进度或保持活跃)
if i % 2 == 0: # 每20秒更新一次心跳防止被认为是死任务 if i % 2 == 0: # 每20秒更新一次心跳防止被认为是死任务
redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({ redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({
"status": "processing", "status": "processing",
"message": f"视频生成中 (已耗时 {i * 10} 秒)..." "message": f"视频生成中 (已耗时 {i * 10} 秒)..."
})) }))
time.sleep(10) time.sleep(10)
poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id) poll_template = Config.VIDEO_POLL_API if active_api == Config.VIDEO_GEN_API else get_backup_api_url(Config.VIDEO_POLL_API)
poll_resp = requests.get(get_proxied_url(poll_url), headers=headers, timeout=Config.PROXY_TIMEOUT_SHORT) poll_url = poll_template.format(task_id=remote_task_id)
poll_headers = {"Authorization": f"Bearer {active_key}", "Content-Type": "application/json"}
poll_resp = requests.get(get_proxied_url(poll_url), headers=poll_headers, timeout=Config.PROXY_TIMEOUT_SHORT)
if poll_resp.status_code != 200: if poll_resp.status_code != 200:
continue continue

View File

@ -1,4 +1,4 @@
from urllib.parse import quote from urllib.parse import quote, urlparse, urlunparse
from config import Config from config import Config
def get_proxied_url(url): def get_proxied_url(url):
@ -9,3 +9,55 @@ def get_proxied_url(url):
if Config.USE_PROXY and url: if Config.USE_PROXY and url:
return f"{Config.PROXY_URL}{quote(url)}" return f"{Config.PROXY_URL}{quote(url)}"
return url return url
def get_backup_api_url(url):
"""
将已有接口地址切换到备用 AI Base URL并保留原始路径
"""
if not url or not Config.AI_BACKUP_BASE_URL:
return url
source = urlparse(url)
backup = urlparse(Config.AI_BACKUP_BASE_URL)
if not backup.scheme or not backup.netloc:
return url
return urlunparse((
backup.scheme,
backup.netloc,
source.path,
source.params,
source.query,
source.fragment,
))
def get_backup_api_key(api_key):
"""
返回内置主 Key 对应的备用 Key自定义 Key 默认沿用原值
"""
fallback_map = {
Config.TRIAL_KEY: Config.TRIAL_BACKUP_KEY,
Config.PREMIUM_KEY: Config.PREMIUM_BACKUP_KEY,
Config.VIDEO_KEY: Config.VIDEO_BACKUP_KEY,
Config.GEMINI_FLASH_IMAGE_PREMIUM_KEY: Config.GEMINI_FLASH_IMAGE_PREMIUM_BACKUP_KEY,
}
return fallback_map.get(api_key, api_key)
def get_api_candidates(url, api_key):
"""
生成主线路与备用线路候选
"""
candidates = [{"url": url, "api_key": api_key, "label": "primary"}]
backup_url = get_backup_api_url(url)
backup_key = get_backup_api_key(api_key)
if backup_url and backup_url != url:
candidates.append({"url": backup_url, "api_key": backup_key, "label": "backup"})
return candidates
def should_switch_to_backup(status_code):
"""
遇到鉴权限流或服务端异常时尝试切换备用线路
"""
return status_code in (401, 403, 408, 429) or status_code >= 500