diff --git a/app.py b/app.py index 73cd714..b4e25ba 100644 --- a/app.py +++ b/app.py @@ -39,35 +39,43 @@ def sync_pending_orders(app): for order in pending_orders: try: - # 查询订单状态 - alipay_result = alipay_service.query_order_status(order.out_trade_no) - - if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']: - # 使用行级锁重新查询,防止并发问题 - order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first() + # 使用防止并发的锁 + lock_key = f"lock:order:{order.out_trade_no}" + # 尝试获取锁,等待3秒。如果获取不到,说明正在处理,本次跳过 + with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): + # 查询订单状态 + alipay_result = alipay_service.query_order_status(order.out_trade_no) - # 二次校验状态,防止异步回调已经处理 - if order_locked and order_locked.status == 'PENDING': - # 更新订单 - order_locked.status = 'PAID' - order_locked.trade_no = alipay_result.get('trade_no') - if not order_locked.paid_at: - order_locked.paid_at = get_bj_now() + if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']: + # 使用行级锁重新查询,防止并发问题 (数据库层面的双重保障) + order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first() - # 增加用户积分 - user = db.session.get(User, order_locked.user_id) - if user: - user.points += order_locked.points - system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id) - updated_count += 1 - - db.session.commit() - elif order_locked and order_locked.status == 'PAID': - # 订单已经被处理,跳过 - logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过") + # 二次校验状态,防止异步回调已经处理 + if order_locked and order_locked.status == 'PENDING': + # 更新订单 + order_locked.status = 'PAID' + order_locked.trade_no = alipay_result.get('trade_no') + if not order_locked.paid_at: + order_locked.paid_at = get_bj_now() + + # 增加用户积分 + user = db.session.get(User, order_locked.user_id) + if user: + user.points += order_locked.points + system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id) + updated_count += 1 + + db.session.commit() + elif order_locked and order_locked.status == 'PAID': + # 订单已经被处理,跳过 + logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过") except Exception as e: - db.session.rollback() - logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}") + # Redis lock error or other errors + if "LockError" in str(e) or "BlockingIOError" in str(e): + logging.info(f"定时任务-订单 {order.out_trade_no} 锁定失败或正在处理中") + else: + db.session.rollback() + logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}") if updated_count > 0: logging.info(f"定时任务完成,帮助更新了{updated_count}个订单") diff --git a/blueprints/payment.py b/blueprints/payment.py index 4de5924..e38a469 100644 --- a/blueprints/payment.py +++ b/blueprints/payment.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template -from extensions import db +from extensions import db, redis_client from models import Order, User, to_bj_time, get_bj_now from services.alipay_service import AlipayService from services.logger import system_logger @@ -81,31 +81,36 @@ def payment_return(): trade_no = data.get('trade_no') try: - # 查询订单 (加锁防止并发导致双重发放) - order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() - - # 如果订单存在且状态为PENDING,则更新为PAID - if order and order.status == 'PENDING': - order.status = 'PAID' - order.trade_no = trade_no - order.paid_at = get_bj_now() + # 使用分布式锁防止并发 + lock_key = f"lock:order:{out_trade_no}" + with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): + # 查询订单 (加锁防止并发导致双重发放) + order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() - # 增加用户积分 - user = db.session.get(User, order.user_id) - if user: - user.points += order.points - system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id) - - db.session.commit() - elif order: - # 订单已经是完成状态,不做处理 - pass - else: - system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no) + # 如果订单存在且状态为PENDING,则更新为PAID + if order and order.status == 'PENDING': + order.status = 'PAID' + order.trade_no = trade_no + order.paid_at = get_bj_now() + + # 增加用户积分 + user = db.session.get(User, order.user_id) + if user: + user.points += order.points + system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id) + + db.session.commit() + elif order: + # 订单已经是完成状态,不做处理 + pass + else: + system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no) except Exception as e: db.session.rollback() - system_logger.error(f"同步回调-订单状态更新失败: {str(e)}") + # 忽略锁错误,说明可能已经在处理 + if "LockError" not in str(e) and "BlockingIOError" not in str(e): + system_logger.error(f"同步回调-订单状态更新失败: {str(e)}") return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no)) else: @@ -204,40 +209,50 @@ def api_sync_order(): # 如果支付宝显示已支付,更新本地订单状态 if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']: - # 使用行级锁重新查询,防止并发问题 - order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() - - # 二次校验状态,防止异步回调/定时任务已经处理 - if order_locked and order_locked.status == 'PENDING': - order_locked.status = 'PAID' - order_locked.trade_no = alipay_result.get('trade_no') - if not order_locked.paid_at: - order_locked.paid_at = get_bj_now() - - # 增加用户积分 - user = db.session.get(User, order_locked.user_id) - if user: - user.points += order_locked.points - system_logger.info(f"主动查询-订单支付成功", order_id=out_trade_no, points=order_locked.points, user_id=user.id) - - db.session.commit() - - return jsonify({ - 'code': 200, - 'msg': '订单已支付,积分已增加', - 'status': 'PAID', - 'points': order_locked.points, - 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') - }) - elif order_locked and order_locked.status == 'PAID': - # 订单已经被处理,直接返回 - return jsonify({ - 'code': 200, - 'msg': '订单已支付', - 'status': 'PAID', - 'points': order_locked.points, - 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None - }) + try: + lock_key = f"lock:order:{out_trade_no}" + with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): + # 使用行级锁重新查询,防止并发问题 + order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() + + # 二次校验状态,防止异步回调/定时任务已经处理 + if order_locked and order_locked.status == 'PENDING': + order_locked.status = 'PAID' + order_locked.trade_no = alipay_result.get('trade_no') + if not order_locked.paid_at: + order_locked.paid_at = get_bj_now() + + # 增加用户积分 + user = db.session.get(User, order_locked.user_id) + if user: + user.points += order_locked.points + system_logger.info(f"主动查询-订单支付成功", order_id=out_trade_no, points=order_locked.points, user_id=user.id) + + db.session.commit() + + return jsonify({ + 'code': 200, + 'msg': '订单已支付,积分已增加', + 'status': 'PAID', + 'points': order_locked.points, + 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') + }) + elif order_locked and order_locked.status == 'PAID': + # 订单已经被处理,直接返回 + return jsonify({ + 'code': 200, + 'msg': '订单已支付', + 'status': 'PAID', + 'points': order_locked.points, + 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None + }) + except Exception as e: + if "LockError" in str(e) or "BlockingIOError" in str(e): + # 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新) + return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'}) + else: + db.session.rollback() + raise e # 支付宝显示未支付 elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']: @@ -282,23 +297,35 @@ def payment_notify(): trade_no = data.get('trade_no') # 加锁查询,确保并发安全 - order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() - if order and order.status == 'PENDING': - order.status = 'PAID' - order.trade_no = trade_no - order.paid_at = get_bj_now() - - user = db.session.get(User, order.user_id) - if user: - user.points += order.points - system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id) - - db.session.commit() - return "success" - elif order: - return "success" - else: - return "fail" + try: + lock_key = f"lock:order:{out_trade_no}" + with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): + order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() + if order and order.status == 'PENDING': + order.status = 'PAID' + order.trade_no = trade_no + order.paid_at = get_bj_now() + + user = db.session.get(User, order.user_id) + if user: + user.points += order.points + system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id) + + db.session.commit() + return "success" + elif order: + return "success" + else: + return "fail" + except Exception as e: + # 如果锁获取失败,暂时返回fail让支付宝重试,或者返回success如果确信正在处理? + # 返回fail让支付宝重试比较稳妥,因为可能正在处理但还没提交 + system_logger.warning(f"处理异步通知锁定失败: {str(e)}") + if "LockError" in str(e) or "BlockingIOError" in str(e): + # 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?) + # 如果返回fail,支付宝会重试。 + return "fail" + raise e else: return "fail"