ai_v/app.py
公司git 93d5c503b2 fix(app): 修复定时任务同步待支付订单的上下文问题
- 将 sync_pending_orders 增加 app 参数以使用 Flask 应用上下文
- 在函数内部使用 app.app_context() 包裹数据库操作,避免上下文错误
- 保持原有逻辑查询并更新30分钟内的待支付订单状态
- 增加日志记录和异常捕获,确保任务稳定运行
- 调度器调用时传入 app 作为参数以支持上下文执行
2026-01-23 17:46:22 +08:00

202 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, jsonify, Response, stream_with_context
from config import Config
from extensions import db, redis_client, migrate, s3_client
from blueprints.auth import auth_bp
from blueprints.api import api_bp
from blueprints.admin import admin_bp
from blueprints.payment import payment_bp
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import threading
import time
import logging
# 导入模律(必需在 db.create_all() 之前导入)
import models
# 定时任务函数
def sync_pending_orders(app):
"""定时任务: 检查并同步30分钟内的待支付订单"""
with app.app_context():
from models import Order, User, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
from datetime import timedelta
try:
# 查询最还30分钟内的待支付订单
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
pending_orders = Order.query.filter(
Order.status == 'PENDING',
Order.created_at >= thirty_min_ago
).all()
if not pending_orders:
return
alipay_service = AlipayService()
updated_count = 0
for order in pending_orders:
try:
# 查询订单状态
alipay_result = alipay_service.query_order_status(order.out_trade_no)
if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
# 使用行级锁重新查询,防止并发问题
order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调已经处理
if order_locked and order_locked.status == 'PENDING':
# 更新订单
order_locked.status = 'PAID'
order_locked.trade_no = alipay_result.get('trade_no')
if not order_locked.paid_at:
order_locked.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order_locked.user_id)
if user:
user.points += order_locked.points
system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id)
updated_count += 1
db.session.commit()
elif order_locked and order_locked.status == 'PAID':
# 订单已经被处理,跳过
logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过")
except Exception as e:
db.session.rollback()
logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}")
if updated_count > 0:
logging.info(f"定时任务完成,帮助更新了{updated_count}个订单")
except Exception as e:
logging.error(f"定时任务异常: {str(e)}", exc_info=True)
# 导入模律(必需在 db.create_all() 之前导入)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 初始化扩展
db.init_app(app)
redis_client.init_app(app)
migrate.init_app(app, db)
# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(api_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(payment_bp)
from flask import g, session
from models import User, SystemLog
@app.before_request
def load_user():
"""在每个请求前加载用户信息到 g供日志系统使用"""
user_id = session.get('user_id')
if user_id:
g.user_id = user_id
g.user = db.session.get(User, user_id)
else:
g.user_id = None
g.user = None
@app.context_processor
def inject_menu():
"""将导航菜单注入所有模板,实现服务端渲染,解决闪烁问题"""
from blueprints.auth import get_user_menu
menu = get_user_menu(g.user) if hasattr(g, 'user') else []
return dict(nav_menu=menu)
@app.route('/api/system_logs')
def get_system_logs():
"""获取系统日志数据 (供后台管理界面使用)"""
# 这里可以加入权限检查
logs = SystemLog.query.order_by(SystemLog.created_at.desc()).limit(100).all()
return jsonify([{
'id': log.id,
'user_id': log.user_id,
'level': log.level,
'module': log.module,
'message': log.message,
'extra': log.extra,
'ip': log.ip,
'path': log.path,
'method': log.method,
'user_agent': log.user_agent,
'created_at': log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for log in logs])
@app.route('/')
def index():
return render_template('index.html')
@app.route('/ocr')
def ocr():
return render_template('ocr.html')
@app.route('/visualizer')
def visualizer():
return render_template('kongzhiqi.html')
@app.route('/video')
def video_page():
return render_template('video.html')
@app.route('/files/<path:filename>')
def get_file(filename):
"""Proxy route to serve files from MinIO via the backend"""
try:
# Use s3_client to get the object
file_obj = s3_client.get_object(Bucket=Config.MINIO["bucket"], Key=filename)
def generate():
for chunk in file_obj['Body'].iter_chunks(chunk_size=4096):
yield chunk
return Response(
stream_with_context(generate()),
mimetype=file_obj['ContentType'],
headers={
"Cache-Control": "public, max-age=86400"
}
)
except Exception as e:
# system_logger.error(f"File proxy error: {str(e)}") # Optional logging
return jsonify({"error": "File not found"}), 404
# 自动创建数据库表
with app.app_context():
print("🔧 正在检查并创建数据库表...")
db.create_all()
print("✅ 数据库表已就绪")
# 创建并启动定时任务调度器
try:
scheduler = BackgroundScheduler(daemon=True)
# 每分钟检查一次待支付订单
scheduler.add_job(
func=sync_pending_orders,
args=[app],
trigger=IntervalTrigger(minutes=1),
id='sync_pending_orders',
name='同步待支付订单',
replace_existing=True
)
scheduler.start()
print("🚀 定时任务引擎已启动,将每分钟检查一次待支付订单")
except Exception as e:
print(f"⚠️ 定时任务启动失败: {str(e)}")
return app
app = create_app()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)