ai_v/blueprints/payment.py
公司git 30105c685a fix(payment): 修复支付宝通知签名验证问题
- 在支付通知日志中添加接收到的支付宝数据记录
- 移除签名验证参数中的 sign_type,避免验证错误
- 优化支付宝服务签名验证逻辑,提高兼容性和准确度
2026-01-23 15:10:16 +08:00

210 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template
from extensions import db
from models import Order, User, to_bj_time, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
import uuid
from datetime import datetime, timedelta
payment_bp = Blueprint('payment', __name__, url_prefix='/payment')
# 积分价格配置
POINTS_PACKAGES = {
'50': {'points': 50, 'amount': 5.00},
'200': {'points': 200, 'amount': 20.00},
'1000': {'points': 1000, 'amount': 100.00},
'5000': {'points': 5000, 'amount': 500.00},
}
@payment_bp.route('/create', methods=['POST'])
def create_payment():
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
package_id = request.form.get('package_id')
if package_id not in POINTS_PACKAGES:
return jsonify({'code': 400, 'msg': '无效的套餐'}), 400
package = POINTS_PACKAGES[package_id]
user_id = session['user_id']
# 生成唯一订单号 (时间戳 + 随机位)
out_trade_no = datetime.now().strftime('%Y%m%d%H%M%S') + str(uuid.uuid4().hex[:6])
# 创建订单记录
try:
order = Order(
out_trade_no=out_trade_no,
user_id=user_id,
amount=package['amount'],
points=package['points'],
status='PENDING'
)
db.session.add(order)
db.session.commit()
system_logger.info(f"用户创建充值订单", order_id=out_trade_no, amount=package['amount'], points=package['points'])
except Exception as e:
db.session.rollback()
system_logger.error(f"订单创建失败: {str(e)}")
return f"订单创建失败: {str(e)}", 500
# 获取支付链接
try:
alipay_service = AlipayService()
pay_url = alipay_service.create_order_url(
out_trade_no=out_trade_no,
total_amount=package['amount'],
subject=f"购买{package['points']}积分"
)
return redirect(pay_url)
except Exception as e:
system_logger.error(f"支付链接生成失败: {str(e)}")
return f"支付链接生成失败: {str(e)}", 500
@payment_bp.route('/return')
def payment_return():
"""支付成功后的同步跳转页面"""
try:
data = request.args.to_dict()
signature = data.get("sign")
if not signature:
return "参数错误:缺少签名", 400
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
out_trade_no = data.get('out_trade_no')
if success:
# 同步回调也进行订单处理,防止异步回调延迟或失败
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
try:
# 查询订单 (加锁防止并发导致双重发放)
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
# 如果订单存在且状态为PENDING则更新为PAID
if order and order.status == 'PENDING':
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"同步回调-订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
db.session.commit()
elif order:
# 订单已经是完成状态,不做处理
pass
else:
system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no)
except Exception as e:
db.session.rollback()
system_logger.error(f"同步回调-订单状态更新失败: {str(e)}")
return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no))
else:
system_logger.warning(f"支付同步回调验证失败", order_id=out_trade_no)
return "支付验证失败", 400
except Exception as e:
system_logger.error(f"处理同步回调异常: {str(e)}")
return f"处理支付回调失败: {str(e)}", 500
@payment_bp.route('/history', methods=['GET'])
def payment_history():
"""获取当前用户的充值历史记录"""
if 'user_id' not in session:
return redirect(url_for('auth.login'))
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime})
@payment_bp.route('/api/history', methods=['GET'])
def api_payment_history():
"""API 获取当前用户的充值历史记录"""
if 'user_id' not in session:
return jsonify({'code': 401, 'msg': '请先登录'}), 401
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
user_id = session['user_id']
orders = Order.query.filter(
Order.user_id == user_id,
db.or_(
Order.status == 'PAID',
db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago)
)
).order_by(Order.created_at.desc()).all()
return jsonify({
"orders": [{
"id": o.id,
"out_trade_no": o.out_trade_no,
"amount": float(o.amount),
"points": o.points,
"status": o.status,
"trade_no": o.trade_no,
"created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'),
"paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None
} for o in orders]
})
@payment_bp.route('/notify', methods=['POST'])
def payment_notify():
"""支付宝异步通知"""
try:
data = request.form.to_dict()
system_logger.info(f"Received Alipay Notify: {data}")
signature = data.get("sign")
if not signature:
return "fail"
alipay_service = AlipayService()
success = alipay_service.verify_notify(data, signature)
if success and data.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
# 加锁查询,确保并发安全
order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first()
if order and order.status == 'PENDING':
order.status = 'PAID'
order.trade_no = trade_no
order.paid_at = get_bj_now()
user = db.session.get(User, order.user_id)
if user:
user.points += order.points
system_logger.info(f"订单支付成功", order_id=out_trade_no, points=order.points, user_id=user.id)
db.session.commit()
return "success"
elif order:
return "success"
else:
return "fail"
else:
return "fail"
except Exception as e:
system_logger.error(f"处理异步通知异常: {str(e)}")
db.session.rollback()
return "fail"