ai_v/blueprints/payment.py
24024 2ea495721c ```
feat(payment): 添加邀请奖励功能并重构订单支付处理逻辑

- 引入InviteReward模型用于记录邀请奖励
- 新增_process_success_order内部函数统一处理订单成功逻辑
- 实现邀请奖励机制:被邀请人前3次充值时,邀请人获得10%积分奖励
- 在支付回调、主动查询和异步通知中统一调用新的处理函数
- 改进代码结构,消除重复的订单处理逻辑
```
2026-01-23 21:58:37 +08:00

395 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
from extensions import db, redis_client
from models import Order, User, InviteReward, to_bj_time, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
import uuid
from datetime import datetime, timedelta
payment_bp = Blueprint('payment', __name__, url_prefix='/payment')
# 积分价格配置
POINTS_PACKAGES = {
'50': {'points': 50, 'amount': 5.00},
'200': {'points': 200, 'amount': 20.00},
'1000': {'points': 1000, 'amount': 100.00},
'5000': {'points': 5000, 'amount': 500.00},
}
def _process_success_order(order, trade_no):
"""内部处理订单成功逻辑 (需在锁内调用,不包含 commit)"""
if order.status == 'PAID':
return False
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"订单支付成功", order_id=order.out_trade_no, points=order.points, user_id=user.id)
# ========== 邀请奖励逻辑 ==========
if user.invited_by:
# 统计该被邀请人之前已经完成多少次充值(不含本次)
paid_count = Order.query.filter(
Order.user_id == user.id,
Order.status == 'PAID',
Order.id != order.id
).count()
# 只有前3次充值才有奖励
if paid_count < 3:
inviter = db.session.get(User, user.invited_by)
if inviter:
# 计算10%奖励积分(四舍五入)
reward_points = round(order.points * 0.1)
if reward_points > 0:
inviter.points += reward_points
# 记录邀请奖励
invite_reward = InviteReward(
inviter_id=inviter.id,
invitee_id=user.id,
order_id=order.id,
reward_points=reward_points,
recharge_count=paid_count + 1
)
db.session.add(invite_reward)
system_logger.info(
f"邀请奖励发放成功",
inviter_id=inviter.id,
invitee_id=user.id,
reward_points=reward_points,
recharge_count=paid_count + 1
)
return True
@payment_bp.route('/create', methods=['POST'])
def create_payment():
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
package_id = request.form.get('package_id')
if package_id not in POINTS_PACKAGES:
return jsonify({'code': 400, 'msg': '无效的套餐'}), 400
package = POINTS_PACKAGES[package_id]
user_id = session['user_id']
# 生成唯一订单号 (时间戳 + 随机位)
out_trade_no = datetime.now().strftime('%Y%m%d%H%M%S') + str(uuid.uuid4().hex[:6])
# 创建订单记录
try:
order = Order(
out_trade_no=out_trade_no,
user_id=user_id,
amount=package['amount'],
points=package['points'],
status='PENDING'
)
db.session.add(order)
db.session.commit()
system_logger.info(f"用户创建充值订单", order_id=out_trade_no, amount=package['amount'], points=package['points'])
except Exception as e:
db.session.rollback()
system_logger.error(f"订单创建失败: {str(e)}")
return f"订单创建失败: {str(e)}", 500
# 获取支付链接
try:
alipay_service = AlipayService()
pay_url = alipay_service.create_order_url(
out_trade_no=out_trade_no,
total_amount=package['amount'],
subject=f"购买{package['points']}积分"
)
return redirect(pay_url)
except Exception as e:
system_logger.error(f"支付链接生成失败: {str(e)}")
return f"支付链接生成失败: {str(e)}", 500
@payment_bp.route('/return')
def payment_return():
"""支付成功后的同步跳转页面"""
try:
data = request.args.to_dict()
signature = data.get("sign")
if not signature:
return "参数错误:缺少签名", 400
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
out_trade_no = data.get('out_trade_no')
if success:
# 同步回调也进行订单处理,防止异步回调延迟或失败
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
try:
# 使用分布式锁防止并发
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 查询订单 (加锁防止并发导致双重发放)
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 如果订单存在且状态为PENDING则更新为PAID
if order:
_process_success_order(order, trade_no)
db.session.commit()
elif order:
# 订单已经是完成状态,不做处理
pass
else:
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
except Exception as e:
db.session.rollback()
# 忽略锁错误,说明可能已经在处理
if "LockError" not in str(e) and "BlockingIOError" not in str(e):
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
else:
system_logger.warning(f"支付同步回调验证失败", order_id=out_trade_no)
return "支付验证失败", 400
except Exception as e:
system_logger.error(f"处理同步回调异常: {str(e)}")
return f"处理支付回调失败: {str(e)}", 500
@payment_bp.route('/history', methods=['GET'])
def payment_history():
"""获取当前用户的充值历史记录"""
if 'user_id' not in session:
return redirect(url_for('auth.login'))
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime})
@payment_bp.route('/api/history', methods=['GET'])
def api_payment_history():
"""API 获取当前用户的充值历史记录"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
@payment_bp.route('/api/sync_order', methods=['POST'])
def api_sync_order():
"""主动查询订单状态并同步 - 用于用户手动关闭支付页面后检查是否支付成功"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
out_trade_no = request.form.get('out_trade_no')
if not out_trade_no:
return jsonify({'code': 400, 'msg': '订单号不能为空'}), 400
try:
# 查询订单
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
# 只有当前用户才能查询自己的订单
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问此订单'}), 403
# 如果订单已经是PAID或FAILED状态直接返回
if order.status in ['PAID', 'FAILED']:
return jsonify({
'code': 200,
'msg': '订单状态已确定',
'status': order.status,
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
# 向支付宝查询订单状态
alipay_service = AlipayService()
alipay_result = alipay_service.query_order_status(out_trade_no)
if not alipay_result:
return jsonify({'code': 500, 'msg': '查询支付宝订单失败,请稍后重试'}), 500
trade_status = alipay_result.get('trade_status')
# 如果支付宝显示已支付,更新本地订单状态
if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 使用行级锁重新查询,防止并发问题
order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调/定时任务已经处理
# 二次校验状态,防止异步回调/定时任务已经处理
if order_locked:
processed = _process_success_order(order_locked, alipay_result.get('trade_no'))
if processed:
db.session.commit()
return jsonify({
'code': 200,
'msg': '订单已支付,积分已增加',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S')
})
else:
# 订单已经是完成状态,不做处理
return jsonify({
'code': 200,
'msg': '订单已支付',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None
})
except Exception as e:
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新)
return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'})
else:
db.session.rollback()
raise e
# 支付宝显示未支付
elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']:
return jsonify({
'code': 200,
'msg': '订单未支付',
'status': 'PENDING',
'trade_status': trade_status
})
else:
return jsonify({
'code': 200,
'msg': f'订单状态: {trade_status}',
'status': order.status,
'trade_status': trade_status
})
except Exception as e:
system_logger.error(f"主动查询订单异常: {str(e)}")
db.session.rollback()
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500
@payment_bp.route('/notify', methods=['POST'])
def payment_notify():
"""支付宝异步通知"""
try:
data = request.form.to_dict()
# 记录异步通知到系统日志,而不是本地文件
system_logger.info(f"支付宝异步通知接收", extra_data=str(data))
signature = data.get("sign")
if not signature:
return "fail"
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
if success and data.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
# 加锁查询,确保并发安全
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
if order:
_process_success_order(order, trade_no)
db.session.commit()
return "success"
else:
return "fail"
except Exception as e:
# 如果锁获取失败暂时返回fail让支付宝重试或者返回success如果确信正在处理
# 返回fail让支付宝重试比较稳妥因为可能正在处理但还没提交
system_logger.warning(f"处理异步通知锁定失败: {str(e)}")
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?)
# 如果返回fail支付宝会重试。
return "fail"
raise e
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if order:
_process_success_order(order, trade_no)
db.session.commit()
return "success"
else:
return "fail"
else:
return "fail"
except Exception as e:
system_logger.error(f"处理异步通知异常: {str(e)}")
db.session.rollback()
return "fail"
@payment_bp.route('/api/query/<out_trade_no>', methods=['GET'])
def api_query_order(out_trade_no):
"""简单查询接口 - 获取订单当前状态而不自动更新"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
try:
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问'}), 403
return jsonify({
'code': 200,
'out_trade_no': order.out_trade_no,
'status': order.status,
'amount': float(order.amount),
'points': order.points,
'trade_no': order.trade_no,
'created_at': order.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
except Exception as e:
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500