ai_v/blueprints/payment.py

395 lines
16 KiB
Python
Raw Permalink Normal View History

from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
from extensions import db, redis_client
from models import Order, User, InviteReward, to_bj_time, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
import uuid
from datetime import datetime, timedelta
payment_bp = Blueprint('payment', __name__, url_prefix='/payment')
# 积分价格配置
POINTS_PACKAGES = {
'50': {'points': 50, 'amount': 5.00},
'200': {'points': 200, 'amount': 20.00},
'1000': {'points': 1000, 'amount': 100.00},
'5000': {'points': 5000, 'amount': 500.00},
}
def _process_success_order(order, trade_no):
"""内部处理订单成功逻辑 (需在锁内调用,不包含 commit)"""
if order.status == 'PAID':
return False
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"订单支付成功", order_id=order.out_trade_no, points=order.points, user_id=user.id)
# ========== 邀请奖励逻辑 ==========
if user.invited_by:
# 统计该被邀请人之前已经完成多少次充值(不含本次)
paid_count = Order.query.filter(
Order.user_id == user.id,
Order.status == 'PAID',
Order.id != order.id
).count()
# 只有前3次充值才有奖励
if paid_count < 3:
inviter = db.session.get(User, user.invited_by)
if inviter:
# 计算10%奖励积分(四舍五入)
reward_points = round(order.points * 0.1)
if reward_points > 0:
inviter.points += reward_points
# 记录邀请奖励
invite_reward = InviteReward(
inviter_id=inviter.id,
invitee_id=user.id,
order_id=order.id,
reward_points=reward_points,
recharge_count=paid_count + 1
)
db.session.add(invite_reward)
system_logger.info(
f"邀请奖励发放成功",
inviter_id=inviter.id,
invitee_id=user.id,
reward_points=reward_points,
recharge_count=paid_count + 1
)
return True
@payment_bp.route('/create', methods=['POST'])
def create_payment():
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
package_id = request.form.get('package_id')
if package_id not in POINTS_PACKAGES:
return jsonify({'code': 400, 'msg': '无效的套餐'}), 400
package = POINTS_PACKAGES[package_id]
user_id = session['user_id']
# 生成唯一订单号 (时间戳 + 随机位)
out_trade_no = datetime.now().strftime('%Y%m%d%H%M%S') + str(uuid.uuid4().hex[:6])
# 创建订单记录
try:
order = Order(
out_trade_no=out_trade_no,
user_id=user_id,
amount=package['amount'],
points=package['points'],
status='PENDING'
)
db.session.add(order)
db.session.commit()
system_logger.info(f"用户创建充值订单", order_id=out_trade_no, amount=package['amount'], points=package['points'])
except Exception as e:
db.session.rollback()
system_logger.error(f"订单创建失败: {str(e)}")
return f"订单创建失败: {str(e)}", 500
# 获取支付链接
try:
alipay_service = AlipayService()
pay_url = alipay_service.create_order_url(
out_trade_no=out_trade_no,
total_amount=package['amount'],
subject=f"购买{package['points']}积分"
)
return redirect(pay_url)
except Exception as e:
system_logger.error(f"支付链接生成失败: {str(e)}")
return f"支付链接生成失败: {str(e)}", 500
@payment_bp.route('/return')
def payment_return():
"""支付成功后的同步跳转页面"""
try:
data = request.args.to_dict()
signature = data.get("sign")
if not signature:
return "参数错误:缺少签名", 400
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
out_trade_no = data.get('out_trade_no')
if success:
# 同步回调也进行订单处理,防止异步回调延迟或失败
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
try:
# 使用分布式锁防止并发
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 查询订单 (加锁防止并发导致双重发放)
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 如果订单存在且状态为PENDING则更新为PAID
if order:
_process_success_order(order, trade_no)
db.session.commit()
elif order:
# 订单已经是完成状态,不做处理
pass
else:
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
except Exception as e:
db.session.rollback()
# 忽略锁错误,说明可能已经在处理
if "LockError" not in str(e) and "BlockingIOError" not in str(e):
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
else:
system_logger.warning(f"支付同步回调验证失败", order_id=out_trade_no)
return "支付验证失败", 400
except Exception as e:
system_logger.error(f"处理同步回调异常: {str(e)}")
return f"处理支付回调失败: {str(e)}", 500
@payment_bp.route('/history', methods=['GET'])
def payment_history():
"""获取当前用户的充值历史记录"""
if 'user_id' not in session:
return redirect(url_for('auth.login'))
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime})
@payment_bp.route('/api/history', methods=['GET'])
def api_payment_history():
"""API 获取当前用户的充值历史记录"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
@payment_bp.route('/api/sync_order', methods=['POST'])
def api_sync_order():
"""主动查询订单状态并同步 - 用于用户手动关闭支付页面后检查是否支付成功"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
out_trade_no = request.form.get('out_trade_no')
if not out_trade_no:
return jsonify({'code': 400, 'msg': '订单号不能为空'}), 400
try:
# 查询订单
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
# 只有当前用户才能查询自己的订单
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问此订单'}), 403
# 如果订单已经是PAID或FAILED状态直接返回
if order.status in ['PAID', 'FAILED']:
return jsonify({
'code': 200,
'msg': '订单状态已确定',
'status': order.status,
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
# 向支付宝查询订单状态
alipay_service = AlipayService()
alipay_result = alipay_service.query_order_status(out_trade_no)
if not alipay_result:
return jsonify({'code': 500, 'msg': '查询支付宝订单失败,请稍后重试'}), 500
trade_status = alipay_result.get('trade_status')
# 如果支付宝显示已支付,更新本地订单状态
if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
# 使用行级锁重新查询,防止并发问题
order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调/定时任务已经处理
# 二次校验状态,防止异步回调/定时任务已经处理
if order_locked:
processed = _process_success_order(order_locked, alipay_result.get('trade_no'))
if processed:
db.session.commit()
return jsonify({
'code': 200,
'msg': '订单已支付,积分已增加',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S')
})
else:
# 订单已经是完成状态,不做处理
return jsonify({
'code': 200,
'msg': '订单已支付',
'status': 'PAID',
'points': order_locked.points,
'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None
})
except Exception as e:
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新)
return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'})
else:
db.session.rollback()
raise e
# 支付宝显示未支付
elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']:
return jsonify({
'code': 200,
'msg': '订单未支付',
'status': 'PENDING',
'trade_status': trade_status
})
else:
return jsonify({
'code': 200,
'msg': f'订单状态: {trade_status}',
'status': order.status,
'trade_status': trade_status
})
except Exception as e:
system_logger.error(f"主动查询订单异常: {str(e)}")
db.session.rollback()
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500
@payment_bp.route('/notify', methods=['POST'])
def payment_notify():
"""支付宝异步通知"""
try:
data = request.form.to_dict()
# 记录异步通知到系统日志,而不是本地文件
system_logger.info(f"支付宝异步通知接收", extra_data=str(data))
signature = data.get("sign")
if not signature:
return "fail"
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
if success and data.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
# 加锁查询,确保并发安全
try:
lock_key = f"lock:order:{out_trade_no}"
with redis_client.lock(lock_key, timeout=10, blocking_timeout=3):
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
if order:
_process_success_order(order, trade_no)
db.session.commit()
return "success"
else:
return "fail"
except Exception as e:
# 如果锁获取失败暂时返回fail让支付宝重试或者返回success如果确信正在处理
# 返回fail让支付宝重试比较稳妥因为可能正在处理但还没提交
system_logger.warning(f"处理异步通知锁定失败: {str(e)}")
if "LockError" in str(e) or "BlockingIOError" in str(e):
# 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?)
# 如果返回fail支付宝会重试。
return "fail"
raise e
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if order:
_process_success_order(order, trade_no)
db.session.commit()
return "success"
else:
return "fail"
else:
return "fail"
except Exception as e:
system_logger.error(f"处理异步通知异常: {str(e)}")
db.session.rollback()
return "fail"
@payment_bp.route('/api/query/<out_trade_no>', methods=['GET'])
def api_query_order(out_trade_no):
"""简单查询接口 - 获取订单当前状态而不自动更新"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
try:
order = Order.query.filter_by(out_trade_no=out_trade_no).first()
if not order:
return jsonify({'code': 404, 'msg': '订单不存在'}), 404
if order.user_id != session['user_id']:
return jsonify({'code': 403, 'msg': '无权限访问'}), 403
return jsonify({
'code': 200,
'out_trade_no': order.out_trade_no,
'status': order.status,
'amount': float(order.amount),
'points': order.points,
'trade_no': order.trade_no,
'created_at': order.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None
})
except Exception as e:
return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500