ai_v/app.py
公司git 1196809c6a feat(payment): 添加订单状态同步功能和主动查询接口
- 在 app.py 中集成定时任务,每分钟同步最近30分钟内待支付订单状态
- 定时任务调用支付宝接口更新订单状态及用户积分,记录日志
- payment.py 新增主动查询订单接口,支持用户手动触发订单状态同步
- 添加订单简单查询API,返回订单当前状态信息
- 支付异步通知日志记录优化,改为系统日志记录
- 配置文件中调整支付宝回调地址使用 HTTPS 协议
- alipay_service.py 增加支付宝订单状态查询方法,支持主动轮询订单状态
2026-01-23 17:40:23 +08:00

200 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, jsonify, Response, stream_with_context
from config import Config
from extensions import db, redis_client, migrate, s3_client
from blueprints.auth import auth_bp
from blueprints.api import api_bp
from blueprints.admin import admin_bp
from blueprints.payment import payment_bp
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import threading
import time
import logging
# 导入模律(必需在 db.create_all() 之前导入)
import models
# 定时任务函数
def sync_pending_orders():
"""定时任务: 检查并同步30分钟内的待支付订单"""
from models import Order, User, get_bj_now
from services.alipay_service import AlipayService
from services.logger import system_logger
from datetime import timedelta
try:
# 查询最还30分钟内的待支付订单
thirty_min_ago = get_bj_now() - timedelta(minutes=30)
pending_orders = Order.query.filter(
Order.status == 'PENDING',
Order.created_at >= thirty_min_ago
).all()
if not pending_orders:
return
alipay_service = AlipayService()
updated_count = 0
for order in pending_orders:
try:
# 查询订单状态
alipay_result = alipay_service.query_order_status(order.out_trade_no)
if alipay_result and alipay_result.get('trade_status') in ['TRADE_SUCCESS', 'TRADE_FINISHED']:
# 使用行级锁重新查询,防止并发问题
order_locked = Order.query.filter_by(out_trade_no=order.out_trade_no).with_for_update().first()
# 二次校验状态,防止异步回调已经处理
if order_locked and order_locked.status == 'PENDING':
# 更新订单
order_locked.status = 'PAID'
order_locked.trade_no = alipay_result.get('trade_no')
if not order_locked.paid_at:
order_locked.paid_at = get_bj_now()
# 增加用户积分
user = db.session.get(User, order_locked.user_id)
if user:
user.points += order_locked.points
system_logger.info(f"定时任务-订单支付成功", order_id=order_locked.out_trade_no, points=order_locked.points, user_id=user.id)
updated_count += 1
db.session.commit()
elif order_locked and order_locked.status == 'PAID':
# 订单已经被处理,跳过
logging.info(f"定时任务-订单 {order.out_trade_no} 已被处理,跳过")
except Exception as e:
db.session.rollback()
logging.error(f"定时任务处理订单 {order.out_trade_no} 失败: {str(e)}")
if updated_count > 0:
logging.info(f"定时任务完成,帮助更新了{updated_count}个订单")
except Exception as e:
logging.error(f"定时任务异常: {str(e)}", exc_info=True)
# 导入模律(必需在 db.create_all() 之前导入)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# 初始化扩展
db.init_app(app)
redis_client.init_app(app)
migrate.init_app(app, db)
# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(api_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(payment_bp)
from flask import g, session
from models import User, SystemLog
@app.before_request
def load_user():
"""在每个请求前加载用户信息到 g供日志系统使用"""
user_id = session.get('user_id')
if user_id:
g.user_id = user_id
g.user = db.session.get(User, user_id)
else:
g.user_id = None
g.user = None
@app.context_processor
def inject_menu():
"""将导航菜单注入所有模板,实现服务端渲染,解决闪烁问题"""
from blueprints.auth import get_user_menu
menu = get_user_menu(g.user) if hasattr(g, 'user') else []
return dict(nav_menu=menu)
@app.route('/api/system_logs')
def get_system_logs():
"""获取系统日志数据 (供后台管理界面使用)"""
# 这里可以加入权限检查
logs = SystemLog.query.order_by(SystemLog.created_at.desc()).limit(100).all()
return jsonify([{
'id': log.id,
'user_id': log.user_id,
'level': log.level,
'module': log.module,
'message': log.message,
'extra': log.extra,
'ip': log.ip,
'path': log.path,
'method': log.method,
'user_agent': log.user_agent,
'created_at': log.created_at_bj.strftime('%Y-%m-%d %H:%M:%S')
} for log in logs])
@app.route('/')
def index():
return render_template('index.html')
@app.route('/ocr')
def ocr():
return render_template('ocr.html')
@app.route('/visualizer')
def visualizer():
return render_template('kongzhiqi.html')
@app.route('/video')
def video_page():
return render_template('video.html')
@app.route('/files/<path:filename>')
def get_file(filename):
"""Proxy route to serve files from MinIO via the backend"""
try:
# Use s3_client to get the object
file_obj = s3_client.get_object(Bucket=Config.MINIO["bucket"], Key=filename)
def generate():
for chunk in file_obj['Body'].iter_chunks(chunk_size=4096):
yield chunk
return Response(
stream_with_context(generate()),
mimetype=file_obj['ContentType'],
headers={
"Cache-Control": "public, max-age=86400"
}
)
except Exception as e:
# system_logger.error(f"File proxy error: {str(e)}") # Optional logging
return jsonify({"error": "File not found"}), 404
# 自动创建数据库表
with app.app_context():
print("🔧 正在检查并创建数据库表...")
db.create_all()
print("✅ 数据库表已就绪")
# 创建并启动定时任务调度器
try:
scheduler = BackgroundScheduler(daemon=True)
# 每分钟检查一次待支付订单
scheduler.add_job(
func=sync_pending_orders,
trigger=IntervalTrigger(minutes=1),
id='sync_pending_orders',
name='同步待支付订单',
replace_existing=True
)
scheduler.start()
print("🚀 定时任务引擎已启动,将每分钟检查一次待支付订单")
except Exception as e:
print(f"⚠️ 定时任务启动失败: {str(e)}")
return app
app = create_app()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)