ai_v/app.py
公司git 05eba467b4 feat(payment): 在订单处理流程中新增Redis分布式锁机制防止并发问题
- 在定时任务同步订单状态时使用Redis锁,避免并发导致重复处理
- 同步回调接口中添加分布式锁,确保订单状态更新的原子性
- 主动查询订单支付状态接口增加Redis锁,防止重复发放积分
- 异步通知处理逻辑引入锁机制,处理锁定失败时适当重试或记录日志
- 捕获并区分锁定异常,避免错误日志泛滥,提升系统稳定性
- 保留数据库行级锁作为数据库层并发控制的保障措施
2026-01-23 17:55:33 +08:00

210 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, jsonify, Response, stream_with_context
from config import Config
from extensions import db, redis_client, migrate, s3_client
from blueprints.auth import auth_bp
from blueprints.api import api_bp
from blueprints.admin import admin_bp
from blueprints.payment import payment_bp
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import threading
import time
import logging
# 导入模律(必需在 db.create_all() 之前导入)
import models
# 定时任务函数
def sync_pending_orders(app):
"""定时任务: 检查并同步30分钟内的待支付订单"""
with app.app_context():
from models import Order, User, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
from datetime import timedelta
try:
# 查询最还30分钟内的待支付订单
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
pending_orders = Order.query.filter(
Order.status == 'PENDING',
Order.created_at >= thirty_min_ago
).all()
if not pending_orders:
return
alipay_service = AlipayService()
updated_count = 0
for order in pending_orders:
try:
# 使用防止并发的锁
lock_key = f"lock:order:{order.out_trade_no}"
# 尝试获取锁等待3秒。如果获取不到说明正在处理本次跳过
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 查询订单状态
alipay_result = alipay_service.query_order_status(order.out_trade_no)
if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
# 使用行级锁重新查询,防止并发问题 (数据库层面的双重保障)
order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调已经处理
if order_locked and order_locked.status == 'PENDING':
# 更新订单
order_locked.status = 'PAID'
order_locked.trade_no = alipay_result.get('trade_no')
if not order_locked.paid_at:
order_locked.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order_locked.user_id)
if user:
user.points += order_locked.points
system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id)
updated_count += 1
db.session.commit()
elif order_locked and order_locked.status == 'PAID':
# 订单已经被处理,跳过
logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过")
except Exception as e:
# Redis lock error or other errors
if "LockError" in str(e) or "BlockingIOError" in str(e):
logging.info(f"定时任务-订单 {order.out_trade_no} 锁定失败或正在处理中")
else:
db.session.rollback()
logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}")
if updated_count > 0:
logging.info(f"定时任务完成,帮助更新了{updated_count}个订单")
except Exception as e:
logging.error(f"定时任务异常: {str(e)}", exc_info=True)
# 导入模律(必需在 db.create_all() 之前导入)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 初始化扩展
db.init_app(app)
redis_client.init_app(app)
migrate.init_app(app, db)
# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(api_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(payment_bp)
from flask import g, session
from models import User, SystemLog
@app.before_request
def load_user():
"""在每个请求前加载用户信息到 g供日志系统使用"""
user_id = session.get('user_id')
if user_id:
g.user_id = user_id
g.user = db.session.get(User, user_id)
else:
g.user_id = None
g.user = None
@app.context_processor
def inject_menu():
"""将导航菜单注入所有模板,实现服务端渲染,解决闪烁问题"""
from blueprints.auth import get_user_menu
menu = get_user_menu(g.user) if hasattr(g, 'user') else []
return dict(nav_menu=menu)
@app.route('/api/system_logs')
def get_system_logs():
"""获取系统日志数据 (供后台管理界面使用)"""
# 这里可以加入权限检查
logs = SystemLog.query.order_by(SystemLog.created_at.desc()).limit(100).all()
return jsonify([{
'id': log.id,
'user_id': log.user_id,
'level': log.level,
'module': log.module,
'message': log.message,
'extra': log.extra,
'ip': log.ip,
'path': log.path,
'method': log.method,
'user_agent': log.user_agent,
'created_at': log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for log in logs])
@app.route('/')
def index():
return render_template('index.html')
@app.route('/ocr')
def ocr():
return render_template('ocr.html')
@app.route('/visualizer')
def visualizer():
return render_template('kongzhiqi.html')
@app.route('/video')
def video_page():
return render_template('video.html')
@app.route('/files/<path:filename>')
def get_file(filename):
"""Proxy route to serve files from MinIO via the backend"""
try:
# Use s3_client to get the object
file_obj = s3_client.get_object(Bucket=Config.MINIO["bucket"], Key=filename)
def generate():
for chunk in file_obj['Body'].iter_chunks(chunk_size=4096):
yield chunk
return Response(
stream_with_context(generate()),
mimetype=file_obj['ContentType'],
headers={
"Cache-Control": "public, max-age=86400"
}
)
except Exception as e:
# system_logger.error(f"File proxy error: {str(e)}") # Optional logging
return jsonify({"error": "File not found"}), 404
# 自动创建数据库表
with app.app_context():
print("🔧 正在检查并创建数据库表...")
db.create_all()
print("✅ 数据库表已就绪")
# 创建并启动定时任务调度器
try:
scheduler = BackgroundScheduler(daemon=True)
# 每分钟检查一次待支付订单
scheduler.add_job(
func=sync_pending_orders,
args=[app],
trigger=IntervalTrigger(minutes=1),
id='sync_pending_orders',
name='同步待支付订单',
replace_existing=True
)
scheduler.start()
print("🚀 定时任务引擎已启动,将每分钟检查一次待支付订单")
except Exception as e:
print(f"⚠️ 定时任务启动失败: {str(e)}")
return app
app = create_app()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)