feat(payment): 在订单处理流程中新增Redis分布式锁机制防止并发问题
- 在定时任务同步订单状态时使用Redis锁,避免并发导致重复处理 - 同步回调接口中添加分布式锁,确保订单状态更新的原子性 - 主动查询订单支付状态接口增加Redis锁,防止重复发放积分 - 异步通知处理逻辑引入锁机制,处理锁定失败时适当重试或记录日志 - 捕获并区分锁定异常,避免错误日志泛滥,提升系统稳定性 - 保留数据库行级锁作为数据库层并发控制的保障措施
This commit is contained in:
parent
93d5c503b2
commit
05eba467b4
56
app.py
56
app.py
@ -39,35 +39,43 @@ def sync_pending_orders(app):
|
|||||||
|
|
||||||
for order in pending_orders:
|
for order in pending_orders:
|
||||||
try:
|
try:
|
||||||
# 查询订单状态
|
# 使用防止并发的锁
|
||||||
alipay_result = alipay_service.query_order_status(order.out_trade_no)
|
lock_key = f"lock:order:{order.out_trade_no}"
|
||||||
|
# 尝试获取锁,等待3秒。如果获取不到,说明正在处理,本次跳过
|
||||||
|
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
|
||||||
|
# 查询订单状态
|
||||||
|
alipay_result = alipay_service.query_order_status(order.out_trade_no)
|
||||||
|
|
||||||
if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
|
if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
|
||||||
# 使用行级锁重新查询,防止并发问题
|
# 使用行级锁重新查询,防止并发问题 (数据库层面的双重保障)
|
||||||
order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first()
|
order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first()
|
||||||
|
|
||||||
# 二次校验状态,防止异步回调已经处理
|
# 二次校验状态,防止异步回调已经处理
|
||||||
if order_locked and order_locked.status == 'PENDING':
|
if order_locked and order_locked.status == 'PENDING':
|
||||||
# 更新订单
|
# 更新订单
|
||||||
order_locked.status = 'PAID'
|
order_locked.status = 'PAID'
|
||||||
order_locked.trade_no = alipay_result.get('trade_no')
|
order_locked.trade_no = alipay_result.get('trade_no')
|
||||||
if not order_locked.paid_at:
|
if not order_locked.paid_at:
|
||||||
order_locked.paid_at = get_bj_now()
|
order_locked.paid_at = get_bj_now()
|
||||||
|
|
||||||
# 增加用户积分
|
# 增加用户积分
|
||||||
user = db.session.get(User, order_locked.user_id)
|
user = db.session.get(User, order_locked.user_id)
|
||||||
if user:
|
if user:
|
||||||
user.points += order_locked.points
|
user.points += order_locked.points
|
||||||
system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id)
|
system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id)
|
||||||
updated_count += 1
|
updated_count += 1
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
elif order_locked and order_locked.status == 'PAID':
|
elif order_locked and order_locked.status == 'PAID':
|
||||||
# 订单已经被处理,跳过
|
# 订单已经被处理,跳过
|
||||||
logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过")
|
logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
db.session.rollback()
|
# Redis lock error or other errors
|
||||||
logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}")
|
if "LockError" in str(e) or "BlockingIOError" in str(e):
|
||||||
|
logging.info(f"定时任务-订单 {order.out_trade_no} 锁定失败或正在处理中")
|
||||||
|
else:
|
||||||
|
db.session.rollback()
|
||||||
|
logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}")
|
||||||
|
|
||||||
if updated_count > 0:
|
if updated_count > 0:
|
||||||
logging.info(f"定时任务完成,帮助更新了{updated_count}个订单")
|
logging.info(f"定时任务完成,帮助更新了{updated_count}个订单")
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
|
from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
|
||||||
from extensions import db
|
from extensions import db, redis_client
|
||||||
from models import Order, User, to_bj_time, get_bj_now
|
from models import Order, User, to_bj_time, get_bj_now
|
||||||
from services.alipay_service import AlipayService
|
from services.alipay_service import AlipayService
|
||||||
from services.logger import system_logger
|
from services.logger import system_logger
|
||||||
@ -81,31 +81,36 @@ def payment_return():
|
|||||||
trade_no = data.get('trade_no')
|
trade_no = data.get('trade_no')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 查询订单 (加锁防止并发导致双重发放)
|
# 使用分布式锁防止并发
|
||||||
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
lock_key = f"lock:order:{out_trade_no}"
|
||||||
|
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
|
||||||
|
# 查询订单 (加锁防止并发导致双重发放)
|
||||||
|
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
||||||
|
|
||||||
# 如果订单存在且状态为PENDING,则更新为PAID
|
# 如果订单存在且状态为PENDING,则更新为PAID
|
||||||
if order and order.status == 'PENDING':
|
if order and order.status == 'PENDING':
|
||||||
order.status = 'PAID'
|
order.status = 'PAID'
|
||||||
order.trade_no = trade_no
|
order.trade_no = trade_no
|
||||||
order.paid_at = get_bj_now()
|
order.paid_at = get_bj_now()
|
||||||
|
|
||||||
# 增加用户积分
|
# 增加用户积分
|
||||||
user = db.session.get(User, order.user_id)
|
user = db.session.get(User, order.user_id)
|
||||||
if user:
|
if user:
|
||||||
user.points += order.points
|
user.points += order.points
|
||||||
system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
|
system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
elif order:
|
elif order:
|
||||||
# 订单已经是完成状态,不做处理
|
# 订单已经是完成状态,不做处理
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
|
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
db.session.rollback()
|
db.session.rollback()
|
||||||
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
|
# 忽略锁错误,说明可能已经在处理
|
||||||
|
if "LockError" not in str(e) and "BlockingIOError" not in str(e):
|
||||||
|
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
|
||||||
|
|
||||||
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
|
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
|
||||||
else:
|
else:
|
||||||
@ -204,40 +209,50 @@ def api_sync_order():
|
|||||||
|
|
||||||
# 如果支付宝显示已支付,更新本地订单状态
|
# 如果支付宝显示已支付,更新本地订单状态
|
||||||
if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
|
if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
|
||||||
# 使用行级锁重新查询,防止并发问题
|
try:
|
||||||
order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
lock_key = f"lock:order:{out_trade_no}"
|
||||||
|
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
|
||||||
|
# 使用行级锁重新查询,防止并发问题
|
||||||
|
order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
||||||
|
|
||||||
# 二次校验状态,防止异步回调/定时任务已经处理
|
# 二次校验状态,防止异步回调/定时任务已经处理
|
||||||
if order_locked and order_locked.status == 'PENDING':
|
if order_locked and order_locked.status == 'PENDING':
|
||||||
order_locked.status = 'PAID'
|
order_locked.status = 'PAID'
|
||||||
order_locked.trade_no = alipay_result.get('trade_no')
|
order_locked.trade_no = alipay_result.get('trade_no')
|
||||||
if not order_locked.paid_at:
|
if not order_locked.paid_at:
|
||||||
order_locked.paid_at = get_bj_now()
|
order_locked.paid_at = get_bj_now()
|
||||||
|
|
||||||
# 增加用户积分
|
# 增加用户积分
|
||||||
user = db.session.get(User, order_locked.user_id)
|
user = db.session.get(User, order_locked.user_id)
|
||||||
if user:
|
if user:
|
||||||
user.points += order_locked.points
|
user.points += order_locked.points
|
||||||
system_logger.info(f"主动查询-订单支付成功", order_id=out_trade_no, points=order_locked.points, user_id=user.id)
|
system_logger.info(f"主动查询-订单支付成功", order_id=out_trade_no, points=order_locked.points, user_id=user.id)
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'code': 200,
|
'code': 200,
|
||||||
'msg': '订单已支付,积分已增加',
|
'msg': '订单已支付,积分已增加',
|
||||||
'status': 'PAID',
|
'status': 'PAID',
|
||||||
'points': order_locked.points,
|
'points': order_locked.points,
|
||||||
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S')
|
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
})
|
})
|
||||||
elif order_locked and order_locked.status == 'PAID':
|
elif order_locked and order_locked.status == 'PAID':
|
||||||
# 订单已经被处理,直接返回
|
# 订单已经被处理,直接返回
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'code': 200,
|
'code': 200,
|
||||||
'msg': '订单已支付',
|
'msg': '订单已支付',
|
||||||
'status': 'PAID',
|
'status': 'PAID',
|
||||||
'points': order_locked.points,
|
'points': order_locked.points,
|
||||||
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None
|
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None
|
||||||
})
|
})
|
||||||
|
except Exception as e:
|
||||||
|
if "LockError" in str(e) or "BlockingIOError" in str(e):
|
||||||
|
# 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新)
|
||||||
|
return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'})
|
||||||
|
else:
|
||||||
|
db.session.rollback()
|
||||||
|
raise e
|
||||||
|
|
||||||
# 支付宝显示未支付
|
# 支付宝显示未支付
|
||||||
elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']:
|
elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']:
|
||||||
@ -282,23 +297,35 @@ def payment_notify():
|
|||||||
trade_no = data.get('trade_no')
|
trade_no = data.get('trade_no')
|
||||||
|
|
||||||
# 加锁查询,确保并发安全
|
# 加锁查询,确保并发安全
|
||||||
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
try:
|
||||||
if order and order.status == 'PENDING':
|
lock_key = f"lock:order:{out_trade_no}"
|
||||||
order.status = 'PAID'
|
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
|
||||||
order.trade_no = trade_no
|
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
|
||||||
order.paid_at = get_bj_now()
|
if order and order.status == 'PENDING':
|
||||||
|
order.status = 'PAID'
|
||||||
|
order.trade_no = trade_no
|
||||||
|
order.paid_at = get_bj_now()
|
||||||
|
|
||||||
user = db.session.get(User, order.user_id)
|
user = db.session.get(User, order.user_id)
|
||||||
if user:
|
if user:
|
||||||
user.points += order.points
|
user.points += order.points
|
||||||
system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
|
system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return "success"
|
return "success"
|
||||||
elif order:
|
elif order:
|
||||||
return "success"
|
return "success"
|
||||||
else:
|
else:
|
||||||
return "fail"
|
return "fail"
|
||||||
|
except Exception as e:
|
||||||
|
# 如果锁获取失败,暂时返回fail让支付宝重试,或者返回success如果确信正在处理?
|
||||||
|
# 返回fail让支付宝重试比较稳妥,因为可能正在处理但还没提交
|
||||||
|
system_logger.warning(f"处理异步通知锁定失败: {str(e)}")
|
||||||
|
if "LockError" in str(e) or "BlockingIOError" in str(e):
|
||||||
|
# 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?)
|
||||||
|
# 如果返回fail,支付宝会重试。
|
||||||
|
return "fail"
|
||||||
|
raise e
|
||||||
else:
|
else:
|
||||||
return "fail"
|
return "fail"
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user