feat(task_service): 增强任务服务的错误处理和重试机制

- 添加了 _extract_error_detail 函数,用于从 API 响应中智能提取详细的错误信息,
  支持多种常见的错误字段格式,提高错误诊断准确性

- 集成 requests 异常处理,区分连接超时、连接错误和读取超时等不同类型的网络异常,
  实现更精确的重试策略,避免因响应丢失导致的任务重复提交

- 在图像和视频生成流程中统一使用新的错误提取函数替代原有的简单错误字段获取,
  提升失败任务的错误信息详细程度

- 优化异常处理逻辑,对不同类型的异常采用相应的处理策略,包括安全重试和终止重试
```
This commit is contained in:
24024 2026-03-12 23:25:46 +08:00
parent 5793a659a9
commit 158ba123b1

View File

@ -7,6 +7,7 @@ import time
import base64 import base64
import threading import threading
from urllib.parse import quote from urllib.parse import quote
from requests.exceptions import ConnectionError, ConnectTimeout
from PIL import Image from PIL import Image
from extensions import s3_client, redis_client, db from extensions import s3_client, redis_client, db
@ -15,6 +16,36 @@ from config import Config
from services.logger import system_logger from services.logger import system_logger
from utils import get_proxied_url from utils import get_proxied_url
def _extract_error_detail(data, default="未知错误"):
"""从API响应中提取详细的错误信息"""
if not isinstance(data, dict):
return default
# 按优先级尝试多个常见的错误字段
candidates = [
data.get('fail_reason'),
data.get('error', {}).get('message') if isinstance(data.get('error'), dict) else data.get('error'),
data.get('message'),
data.get('detail'),
data.get('data', {}).get('fail_reason') if isinstance(data.get('data'), dict) else None,
data.get('data', {}).get('message') if isinstance(data.get('data'), dict) else None,
data.get('data', {}).get('error') if isinstance(data.get('data'), dict) else None,
]
for c in candidates:
if c and isinstance(c, str) and c.strip():
return c.strip()
# 如果所有字段都没有,返回响应摘要帮助排查
# 过滤掉大字段,只保留状态类信息
summary_keys = {'status', 'code', 'error_code', 'fail_code', 'type'}
summary = {k: v for k, v in data.items() if k in summary_keys and v}
if summary:
return f"{default} ({json.dumps(summary, ensure_ascii=False)})"
return default
def sync_images_background(app, record_id, raw_urls): def sync_images_background(app, record_id, raw_urls):
"""后台同步图片至 MinIO并生成缩略图带重试机制""" """后台同步图片至 MinIO并生成缩略图带重试机制"""
with app.app_context(): with app.app_context():
@ -107,22 +138,34 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api
try: try:
# 添加 async=true 参数启用异步模式 # 添加 async=true 参数启用异步模式
submit_resp = requests.post( submit_resp = requests.post(
get_proxied_url(target_api), get_proxied_url(target_api),
params={"async": "true"}, params={"async": "true"},
json=payload, json=payload,
headers=headers, headers=headers,
timeout=Config.PROXY_TIMEOUT_DEFAULT timeout=Config.PROXY_TIMEOUT_DEFAULT
) )
if submit_resp.status_code == 200: if submit_resp.status_code == 200:
break # 成功 break # 成功
else: else:
system_logger.warning(f"任务提交失败(第{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}", user_id=user_id, task_id=task_id) system_logger.warning(f"任务提交失败(第{attempt+1}次): {submit_resp.status_code} - {submit_resp.text[:100]}", user_id=user_id, task_id=task_id)
last_error = f"HTTP {submit_resp.status_code}: {submit_resp.text}" last_error = f"HTTP {submit_resp.status_code}: {submit_resp.text}"
except (ConnectTimeout, ConnectionError) as e:
# 连接阶段的错误(DNS失败、连接拒绝、连接超时),请求大概率未发出,可安全重试
err_str = str(e)
if 'RemoteDisconnected' in err_str or 'ReadTimeout' in err_str:
# 连接建立后断开 = 请求可能已发送到上游,不要重试以免重复任务
system_logger.warning(f"任务提交后响应丢失(第{attempt+1}次),不再重试: {err_str}", user_id=user_id, task_id=task_id)
last_error = err_str
break
system_logger.warning(f"任务提交连接异常(第{attempt+1}次): {err_str}", user_id=user_id, task_id=task_id)
last_error = err_str
time.sleep(1)
except Exception as e: except Exception as e:
# 其他未知异常,保守起见不重试
system_logger.warning(f"任务提交异常(第{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id) system_logger.warning(f"任务提交异常(第{attempt+1}次): {str(e)}", user_id=user_id, task_id=task_id)
last_error = str(e) last_error = str(e)
time.sleep(1) # 短暂等待重试 break
if not submit_resp or submit_resp.status_code != 200: if not submit_resp or submit_resp.status_code != 200:
raise Exception(f"任务提交失败(重试3次后): {last_error}") raise Exception(f"任务提交失败(重试3次后): {last_error}")
@ -205,7 +248,7 @@ def process_image_generation(app, user_id, task_id, payload, api_key, target_api
generation_success = True generation_success = True
break break
elif remote_status == 'FAILURE': elif remote_status == 'FAILURE':
raise Exception(f"生成任务失败: {poll_data.get('fail_reason', '未知错误')}") raise Exception(f"生成任务失败: {_extract_error_detail(poll_data)}")
except requests.RequestException: except requests.RequestException:
continue # 网络波动重试 continue # 网络波动重试
@ -360,13 +403,11 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c
video_url = poll_result['url'] video_url = poll_result['url']
break break
elif status in ['FAILURE', 'FAILED', 'ERROR']: elif status in ['FAILURE', 'FAILED', 'ERROR']:
reason = poll_result.get('fail_reason') or poll_result.get('message') or '未知错误' raise Exception(f"视频生成失败: {_extract_error_detail(poll_result)}")
raise Exception(f"视频生成失败: {reason}")
if not video_url: if not video_url:
if status in ['FAILURE', 'FAILED', 'ERROR']: # 防止循环结束时正好是失败状态但未抛出的极端情况 if status in ['FAILURE', 'FAILED', 'ERROR']:
reason = poll_result.get('fail_reason') or poll_result.get('message') or '未知错误' raise Exception(f"视频生成失败: {_extract_error_detail(poll_result)}")
raise Exception(f"视频生成失败: {reason}")
raise Exception("超时未获取到视频地址") raise Exception("超时未获取到视频地址")
# 3. 持久化记录 # 3. 持久化记录