from flask import Blueprint, request, redirect, url_for, session, jsonify, render_template from extensions import db, redis_client from models import Order, User, InviteReward, to_bj_time, get_bj_now from services.alipay_service import AlipayService from services.logger import system_logger import uuid from datetime import datetime, timedelta payment_bp = Blueprint('payment', __name__, url_prefix='/payment') # 积分价格配置 POINTS_PACKAGES = { '50': {'points': 50, 'amount': 5.00}, '200': {'points': 200, 'amount': 20.00}, '1000': {'points': 1000, 'amount': 100.00}, '5000': {'points': 5000, 'amount': 500.00}, } def _process_success_order(order, trade_no): """内部处理订单成功逻辑 (需在锁内调用,不包含 commit)""" if order.status == 'PAID': return False order.status = 'PAID' order.trade_no = trade_no order.paid_at = get_bj_now() # 增加用户积分 user = db.session.get(User, order.user_id) if user: user.points += order.points system_logger.info(f"订单支付成功", order_id=order.out_trade_no, points=order.points, user_id=user.id) # ========== 邀请奖励逻辑 ========== if user.invited_by: # 统计该被邀请人之前已经完成多少次充值(不含本次) paid_count = Order.query.filter( Order.user_id == user.id, Order.status == 'PAID', Order.id != order.id ).count() # 只有前3次充值才有奖励 if paid_count < 3: inviter = db.session.get(User, user.invited_by) if inviter: # 计算10%奖励积分(四舍五入) reward_points = round(order.points * 0.1) if reward_points > 0: inviter.points += reward_points # 记录邀请奖励 invite_reward = InviteReward( inviter_id=inviter.id, invitee_id=user.id, order_id=order.id, reward_points=reward_points, recharge_count=paid_count + 1 ) db.session.add(invite_reward) system_logger.info( f"邀请奖励发放成功", inviter_id=inviter.id, invitee_id=user.id, reward_points=reward_points, recharge_count=paid_count + 1 ) return True @payment_bp.route('/create', methods=['POST']) def create_payment(): if 'user_id' not in session: return jsonify({'code': 401, 'msg': '请先登录'}), 401 package_id = request.form.get('package_id') if package_id not in POINTS_PACKAGES: return jsonify({'code': 400, 'msg': '无效的套餐'}), 400 package = POINTS_PACKAGES[package_id] user_id = session['user_id'] # 生成唯一订单号 (时间戳 + 随机位) out_trade_no = datetime.now().strftime('%Y%m%d%H%M%S') + str(uuid.uuid4().hex[:6]) # 创建订单记录 try: order = Order( out_trade_no=out_trade_no, user_id=user_id, amount=package['amount'], points=package['points'], status='PENDING' ) db.session.add(order) db.session.commit() system_logger.info(f"用户创建充值订单", order_id=out_trade_no, amount=package['amount'], points=package['points']) except Exception as e: db.session.rollback() system_logger.error(f"订单创建失败: {str(e)}") return f"订单创建失败: {str(e)}", 500 # 获取支付链接 try: alipay_service = AlipayService() pay_url = alipay_service.create_order_url( out_trade_no=out_trade_no, total_amount=package['amount'], subject=f"购买{package['points']}积分" ) return redirect(pay_url) except Exception as e: system_logger.error(f"支付链接生成失败: {str(e)}") return f"支付链接生成失败: {str(e)}", 500 @payment_bp.route('/return') def payment_return(): """支付成功后的同步跳转页面""" try: data = request.args.to_dict() signature = data.get("sign") if not signature: return "参数错误:缺少签名", 400 alipay_service = AlipayService() success = alipay_service.verify_notify(data, signature) out_trade_no = data.get('out_trade_no') if success: # 同步回调也进行订单处理,防止异步回调延迟或失败 out_trade_no = data.get('out_trade_no') trade_no = data.get('trade_no') try: # 使用分布式锁防止并发 lock_key = f"lock:order:{out_trade_no}" with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): # 查询订单 (加锁防止并发导致双重发放) order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() # 如果订单存在且状态为PENDING,则更新为PAID if order: _process_success_order(order, trade_no) db.session.commit() elif order: # 订单已经是完成状态,不做处理 pass else: system_logger.warning(f"同步回调-未找到订单", order_id=out_trade_no) except Exception as e: db.session.rollback() # 忽略锁错误,说明可能已经在处理 if "LockError" not in str(e) and "BlockingIOError" not in str(e): system_logger.error(f"同步回调-订单状态更新失败: {str(e)}") return redirect(url_for('auth.buy_page', success='true', out_trade_no=out_trade_no)) else: system_logger.warning(f"支付同步回调验证失败", order_id=out_trade_no) return "支付验证失败", 400 except Exception as e: system_logger.error(f"处理同步回调异常: {str(e)}") return f"处理支付回调失败: {str(e)}", 500 @payment_bp.route('/history', methods=['GET']) def payment_history(): """获取当前用户的充值历史记录""" if 'user_id' not in session: return redirect(url_for('auth.login')) thirty_min_ago = get_bj_now() - timedelta(minutes=30) user_id = session['user_id'] orders = Order.query.filter( Order.user_id == user_id, db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).all() return render_template('recharge_history.html', orders=orders, modules={'datetime': datetime}) @payment_bp.route('/api/history', methods=['GET']) def api_payment_history(): """API 获取当前用户的充值历史记录""" if 'user_id' not in session: return jsonify({'code': 401, 'msg': '请先登录'}), 401 thirty_min_ago = get_bj_now() - timedelta(minutes=30) user_id = session['user_id'] orders = Order.query.filter( Order.user_id == user_id, db.or_( Order.status == 'PAID', db.and_(Order.status == 'PENDING', Order.created_at >= thirty_min_ago) ) ).order_by(Order.created_at.desc()).all() return jsonify({ "orders": [{ "id": o.id, "out_trade_no": o.out_trade_no, "amount": float(o.amount), "points": o.points, "status": o.status, "trade_no": o.trade_no, "created_at": o.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), "paid_at": o.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if o.paid_at else None } for o in orders] }) @payment_bp.route('/api/sync_order', methods=['POST']) def api_sync_order(): """主动查询订单状态并同步 - 用于用户手动关闭支付页面后检查是否支付成功""" if 'user_id' not in session: return jsonify({'code': 401, 'msg': '请先登录'}), 401 out_trade_no = request.form.get('out_trade_no') if not out_trade_no: return jsonify({'code': 400, 'msg': '订单号不能为空'}), 400 try: # 查询订单 order = Order.query.filter_by(out_trade_no=out_trade_no).first() if not order: return jsonify({'code': 404, 'msg': '订单不存在'}), 404 # 只有当前用户才能查询自己的订单 if order.user_id != session['user_id']: return jsonify({'code': 403, 'msg': '无权限访问此订单'}), 403 # 如果订单已经是PAID或FAILED状态,直接返回 if order.status in ['PAID', 'FAILED']: return jsonify({ 'code': 200, 'msg': '订单状态已确定', 'status': order.status, 'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None }) # 向支付宝查询订单状态 alipay_service = AlipayService() alipay_result = alipay_service.query_order_status(out_trade_no) if not alipay_result: return jsonify({'code': 500, 'msg': '查询支付宝订单失败,请稍后重试'}), 500 trade_status = alipay_result.get('trade_status') # 如果支付宝显示已支付,更新本地订单状态 if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']: try: lock_key = f"lock:order:{out_trade_no}" with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): # 使用行级锁重新查询,防止并发问题 order_locked = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() # 二次校验状态,防止异步回调/定时任务已经处理 # 二次校验状态,防止异步回调/定时任务已经处理 if order_locked: processed = _process_success_order(order_locked, alipay_result.get('trade_no')) if processed: db.session.commit() return jsonify({ 'code': 200, 'msg': '订单已支付,积分已增加', 'status': 'PAID', 'points': order_locked.points, 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') }) else: # 订单已经是完成状态,不做处理 return jsonify({ 'code': 200, 'msg': '订单已支付', 'status': 'PAID', 'points': order_locked.points, 'paid_at': order_locked.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order_locked.paid_at else None }) except Exception as e: if "LockError" in str(e) or "BlockingIOError" in str(e): # 如果获取锁失败,可能是因为正在处理中,返回成功状态(前端会重试或刷新) return jsonify({'code': 200, 'msg': '处理中', 'status': 'PAID'}) else: db.session.rollback() raise e # 支付宝显示未支付 elif trade_status in ['TRADE_CLOSED', 'WAIT_BUYER_PAY']: return jsonify({ 'code': 200, 'msg': '订单未支付', 'status': 'PENDING', 'trade_status': trade_status }) else: return jsonify({ 'code': 200, 'msg': f'订单状态: {trade_status}', 'status': order.status, 'trade_status': trade_status }) except Exception as e: system_logger.error(f"主动查询订单异常: {str(e)}") db.session.rollback() return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500 @payment_bp.route('/notify', methods=['POST']) def payment_notify(): """支付宝异步通知""" try: data = request.form.to_dict() # 记录异步通知到系统日志,而不是本地文件 system_logger.info(f"支付宝异步通知接收", extra_data=str(data)) signature = data.get("sign") if not signature: return "fail" alipay_service = AlipayService() success = alipay_service.verify_notify(data, signature) if success and data.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']: out_trade_no = data.get('out_trade_no') trade_no = data.get('trade_no') # 加锁查询,确保并发安全 try: lock_key = f"lock:order:{out_trade_no}" with redis_client.lock(lock_key, timeout=10, blocking_timeout=3): order = Order.query.filter_by(out_trade_no=out_trade_no).with_for_update().first() if order: _process_success_order(order, trade_no) db.session.commit() return "success" else: return "fail" except Exception as e: # 如果锁获取失败,暂时返回fail让支付宝重试,或者返回success如果确信正在处理? # 返回fail让支付宝重试比较稳妥,因为可能正在处理但还没提交 system_logger.warning(f"处理异步通知锁定失败: {str(e)}") if "LockError" in str(e) or "BlockingIOError" in str(e): # 正在处理中,告诉支付宝由于并发我们正在处理,稍后重试(或视为成功?) # 如果返回fail,支付宝会重试。 return "fail" raise e order = Order.query.filter_by(out_trade_no=out_trade_no).first() if order: _process_success_order(order, trade_no) db.session.commit() return "success" else: return "fail" else: return "fail" except Exception as e: system_logger.error(f"处理异步通知异常: {str(e)}") db.session.rollback() return "fail" @payment_bp.route('/api/query/', methods=['GET']) def api_query_order(out_trade_no): """简单查询接口 - 获取订单当前状态而不自动更新""" if 'user_id' not in session: return jsonify({'code': 401, 'msg': '请先登录'}), 401 try: order = Order.query.filter_by(out_trade_no=out_trade_no).first() if not order: return jsonify({'code': 404, 'msg': '订单不存在'}), 404 if order.user_id != session['user_id']: return jsonify({'code': 403, 'msg': '无权限访问'}), 403 return jsonify({ 'code': 200, 'out_trade_no': order.out_trade_no, 'status': order.status, 'amount': float(order.amount), 'points': order.points, 'trade_no': order.trade_no, 'created_at': order.created_at_bj.strftime('%Y-%m-%d %H:%M:%S'), 'paid_at': order.paid_at_bj.strftime('%Y-%m-%d %H:%M:%S') if order.paid_at else None }) except Exception as e: return jsonify({'code': 500, 'msg': f'查询失败: {str(e)}'}), 500