ai_v/blueprints/payment.py
公司git 05eba467b4 feat(payment): 在订单处理流程中新增Redis分布式锁机制防止并发问题
- 在定时任务同步订单状态时使用Redis锁,避免并发导致重复处理
- 同步回调接口中添加分布式锁,确保订单状态更新的原子性
- 主动查询订单支付状态接口增加Redis锁,防止重复发放积分
- 异步通知处理逻辑引入锁机制,处理锁定失败时适当重试或记录日志
- 捕获并区分锁定异常,避免错误日志泛滥,提升系统稳定性
- 保留数据库行级锁作为数据库层并发控制的保障措施
2026-01-23 17:55:33 +08:00

363 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
from extensions import db, redis_client
from models import Order, User, to_bj_time, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
import uuid
from datetime import datetime, timedelta
payment_bp = Blueprint('payment', __name__, url_prefix='/payment')
# 积分价格配置
POINTS_PACKAGES = {
'50': {'points': 50, 'amount': 5.00},
'200': {'points': 200, 'amount': 20.00},
'1000': {'points': 1000, 'amount': 100.00},
'5000': {'points': 5000, 'amount': 500.00},
}
@payment_bp.route('/create', methods=['POST'])
def create_payment():
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
package_id = request.form.get('package_id')
if package_id not in POINTS_PACKAGES:
return jsonify({'code': 400, 'msg': '无效的套餐'}), 400
package = POINTS_PACKAGES[package_id]
user_id = session['user_id']
# 生成唯一订单号 (时间戳 + 随机位)
out_trade_no = datetime.now().strftime('%Y%m%d%H%M%S') + str(uuid.uuid4().hex[:6])
# 创建订单记录
try:
order = Order(
out_trade_no=out_trade_no,
user_id=user_id,
amount=package['amount'],
points=package['points'],
status='PENDING'
)
db.session.add(order)
db.session.commit()
system_logger.info(f"用户创建充值订单", order_id=out_trade_no, amount=package['amount'], points=package['points'])
except Exception as e:
db.session.rollback()
system_logger.error(f"订单创建失败: {str(e)}")
return f"订单创建失败: {str(e)}", 500
# 获取支付链接
try:
alipay_service = AlipayService()
pay_url = alipay_service.create_order_url(
out_trade_no=out_trade_no,
total_amount=package['amount'],
subject=f"购买{package['points']}积分"
)
return redirect(pay_url)
except Exception as e:
system_logger.error(f"支付链接生成失败: {str(e)}")
return f"支付链接生成失败: {str(e)}", 500
@payment_bp.route('/return')
def payment_return():
"""支付成功后的同步跳转页面"""
try:
data = request.args.to_dict()
signature = data.get("sign")
if not signature:
return "参数错误:缺少签名", 400
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
out_trade_no = data.get('out_trade_no')
if success:
# 同步回调也进行订单处理,防止异步回调延迟或失败
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
try:
# 使用分布式锁防止并发
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 查询订单 (加锁防止并发导致双重发放)
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 如果订单存在且状态为PENDING则更新为PAID
if order and order.status == 'PENDING':
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
db.session.commit()
elif order:
# 订单已经是完成状态,不做处理
pass
else:
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
except Exception as e:
db.session.rollback()
# 忽略锁错误,说明可能已经在处理
if "LockError" not in str(e) and "BlockingIOError" not in str(e):
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
else:
system_logger.warning(f"支付同步回调验证失败", order_id=out_trade_no)
return "支付验证失败", 400
except Exception as e:
system_logger.error(f"处理同步回调异常: {str(e)}")
return f"处理支付回调失败: {str(e)}", 500
@payment_bp.route('/history', methods=['GET'])
def payment_history():
"""获取当前用户的充值历史记录"""
if 'user_id' not in session:
return redirect(url_for('auth.login'))
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime})
@payment_bp.route('/api/history', methods=['GET'])
def api_payment_history():
"""API 获取当前用户的充值历史记录"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
@payment_bp.route('/api/sync_order', methods=['POST'])
def api_sync_order():
"""主动查询订单状态并同步 - 用于用户手动关闭支付页面后检查是否支付成功"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
out_trade_no = request.form.get('out_trade_no')
if not out_trade_no:
return jsonify({'code': 400, 'msg': '订单号不能为空'}), 400
try:
# 查询订单
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
# 只有当前用户才能查询自己的订单
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问此订单'}), 403
# 如果订单已经是PAID或FAILED状态直接返回
if order.status in ['PAID', 'FAILED']:
return jsonify({
'code': 200,
'msg': '订单状态已确定',
'status': order.status,
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
# 向支付宝查询订单状态
alipay_service = AlipayService()
alipay_result = alipay_service.query_order_status(out_trade_no)
if not alipay_result:
return jsonify({'code': 500, 'msg': '查询支付宝订单失败,请稍后重试'}), 500
trade_status = alipay_result.get('trade_status')
# 如果支付宝显示已支付,更新本地订单状态
if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 使用行级锁重新查询,防止并发问题
order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调/定时任务已经处理
if order_locked and order_locked.status == 'PENDING':
order_locked.status = 'PAID'
order_locked.trade_no = alipay_result.get('trade_no')
if not order_locked.paid_at:
order_locked.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order_locked.user_id)
if user:
user.points += order_locked.points
system_logger.info(f"主动查询-订单支付成功", order_id=out_trade_no, points=order_locked.points, user_id=user.id)
db.session.commit()
return jsonify({
'code': 200,
'msg': '订单已支付,积分已增加',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S')
})
elif order_locked and order_locked.status == 'PAID':
# 订单已经被处理,直接返回
return jsonify({
'code': 200,
'msg': '订单已支付',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None
})
except Exception as e:
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新)
return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'})
else:
db.session.rollback()
raise e
# 支付宝显示未支付
elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']:
return jsonify({
'code': 200,
'msg': '订单未支付',
'status': 'PENDING',
'trade_status': trade_status
})
else:
return jsonify({
'code': 200,
'msg': f'订单状态: {trade_status}',
'status': order.status,
'trade_status': trade_status
})
except Exception as e:
system_logger.error(f"主动查询订单异常: {str(e)}")
db.session.rollback()
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500
@payment_bp.route('/notify', methods=['POST'])
def payment_notify():
"""支付宝异步通知"""
try:
data = request.form.to_dict()
# 记录异步通知到系统日志,而不是本地文件
system_logger.info(f"支付宝异步通知接收", extra_data=str(data))
signature = data.get("sign")
if not signature:
return "fail"
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
if success and data.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
# 加锁查询,确保并发安全
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
if order and order.status == 'PENDING':
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
db.session.commit()
return "success"
elif order:
return "success"
else:
return "fail"
except Exception as e:
# 如果锁获取失败暂时返回fail让支付宝重试或者返回success如果确信正在处理
# 返回fail让支付宝重试比较稳妥因为可能正在处理但还没提交
system_logger.warning(f"处理异步通知锁定失败: {str(e)}")
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?)
# 如果返回fail支付宝会重试。
return "fail"
raise e
else:
return "fail"
except Exception as e:
system_logger.error(f"处理异步通知异常: {str(e)}")
db.session.rollback()
return "fail"
@payment_bp.route('/api/query/<out_trade_no>', methods=['GET'])
def api_query_order(out_trade_no):
"""简单查询接口 - 获取订单当前状态而不自动更新"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
try:
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问'}), 403
return jsonify({
'code': 200,
'out_trade_no': order.out_trade_no,
'status': order.status,
'amount': float(order.amount),
'points': order.points,
'trade_no': order.trade_no,
'created_at': order.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
except Exception as e:
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500