feat(task): 增加 Redis 状态缓存支持异步任务进度跟踪

- 在图像生成异步任务开始时设置 Redis 状态为队列中,提示任务已提交等待处理
- 在视频生成异步任务开始时设置 Redis 状态为队列中,提示视频任务已提交准备开始导演
- 图片生成处理函数开始时更新 Redis 状态为处理中,并显示绘制进度提示
- 视频生成轮询过程中定期更新 Redis 状态及进度提示,避免任务超时认为失败
- 增加心跳机制,保持任务活跃状态信息,提升用户体验
This commit is contained in:
公司git 2026-01-23 18:01:54 +08:00
parent 05eba467b4
commit a2f357a8f6
2 changed files with 16 additions and 0 deletions

View File

@ -1,5 +1,6 @@
from config import Config from config import Config
from models import SystemDict, GenerationRecord, User, db from models import SystemDict, GenerationRecord, User, db
from extensions import redis_client
from services.logger import system_logger from services.logger import system_logger
from services.task_service import process_image_generation, process_video_generation from services.task_service import process_image_generation, process_video_generation
import requests import requests
@ -124,6 +125,8 @@ def start_async_image_task(app, user_id, payload, api_key, target_api, cost, mod
log_msg = "用户发起验光单解读" if payload.get('prompt') == "解读验光单" else "用户发起生图任务" log_msg = "用户发起验光单解读" if payload.get('prompt') == "解读验光单" else "用户发起生图任务"
system_logger.info(log_msg, model=model_value, mode=mode) system_logger.info(log_msg, model=model_value, mode=mode)
redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "queued", "message": "任务已提交,等待处理..."}))
threading.Thread( threading.Thread(
target=process_image_generation, target=process_image_generation,
args=(app, user_id, task_id, payload, api_key, target_api, cost, use_trial) args=(app, user_id, task_id, payload, api_key, target_api, cost, use_trial)
@ -151,6 +154,8 @@ def start_async_video_task(app, user_id, payload, cost, model_value):
system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost) system_logger.info("用户发起视频生成任务 (积分模式)", model=model_value, cost=cost)
redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "queued", "message": "视频任务已提交,准备开始导演..."}))
threading.Thread( threading.Thread(
target=process_video_generation, target=process_video_generation,
args=(app, user_id, task_id, payload, api_key, cost, True) # 视频目前默认为积分模式 args=(app, user_id, task_id, payload, api_key, cost, True) # 视频目前默认为积分模式

View File

@ -93,6 +93,8 @@ def sync_images_background(app, record_id, raw_urls):
def process_image_generation(app, user_id, task_id, payload, api_key, target_api, cost, use_trial=False): def process_image_generation(app, user_id, task_id, payload, api_key, target_api, cost, use_trial=False):
"""异步执行图片生成并存入 Redis""" """异步执行图片生成并存入 Redis"""
with app.app_context(): with app.app_context():
# 更新状态为处理中
redis_client.setex(f"task:{task_id}", 3600, json.dumps({"status": "processing", "message": "正如火如荼地绘制中..."}))
try: try:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
# 使用较长的超时时间 (10分钟),确保长耗时任务不被中断 # 使用较长的超时时间 (10分钟),确保长耗时任务不被中断
@ -217,9 +219,18 @@ def process_video_generation(app, user_id, internal_task_id, payload, api_key, c
raise Exception(f"未获取到远程任务 ID: {submit_result}") raise Exception(f"未获取到远程任务 ID: {submit_result}")
# 2. 轮询状态 # 2. 轮询状态
redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({"status": "processing", "message": "视频生成中,请耐心等待..."}))
max_retries = 90 # 提升到 15 分钟 max_retries = 90 # 提升到 15 分钟
video_url = None video_url = None
for i in range(max_retries): for i in range(max_retries):
# 更新进度 (伪进度或保持活跃)
if i % 2 == 0: # 每20秒更新一次心跳防止被认为是死任务
redis_client.setex(f"task:{internal_task_id}", 3600, json.dumps({
"status": "processing",
"message": f"视频生成中 (已耗时 {i * 10} 秒)..."
}))
time.sleep(10) time.sleep(10)
poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id) poll_url = Config.VIDEO_POLL_API.format(task_id=remote_task_id)
poll_resp = requests.get(poll_url, headers=headers, timeout=30) poll_resp = requests.get(poll_url, headers=headers, timeout=30)