from models import GenerationRecord, Order, db, to_bj_time from sqlalchemy import func from datetime import datetime, timedelta def get_point_stats(user_id, days=7): """获取用户积分消耗统计数据 (用于图表)""" end_date = datetime.utcnow() start_date = end_date - timedelta(days=days-1) # 1. 获取消耗统计 (从 GenerationRecord) # 按天分组汇总结算 deductions = db.session.query( func.date(GenerationRecord.created_at).label('date'), func.sum(GenerationRecord.cost).label('total_cost') ).filter( GenerationRecord.user_id == user_id, GenerationRecord.created_at >= start_date.date() ).group_by(func.date(GenerationRecord.created_at)).all() # 2. 获取充值统计 (从 Order) incomes = db.session.query( func.date(Order.created_at).label('date'), func.sum(Order.points).label('total_points') ).filter( Order.user_id == user_id, Order.status == 'PAID', Order.created_at >= start_date.date() ).group_by(func.date(Order.created_at)).all() # 3. 补齐日期,生成连续数据 date_list = [(start_date + timedelta(days=i)).strftime('%m-%d') for i in range(days)] deduction_map = {d.date.strftime('%m-%d'): int(d.total_cost or 0) for d in deductions} income_map = {i.date.strftime('%m-%d'): int(i.total_points or 0) for i in incomes} return { "labels": date_list, "deductions": [deduction_map.get(d, 0) for d in date_list], "incomes": [income_map.get(d, 0) for d in date_list] } def get_point_details(user_id, page=1, per_page=20): """获取积分变动明细列表""" pagination = GenerationRecord.query.filter( GenerationRecord.user_id == user_id, GenerationRecord.cost > 0 ).order_by(GenerationRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) details = [] for r in pagination.items: details.append({ "type": "deduction", "desc": r.prompt[:30] + "..." if r.prompt else "AI 生成", "model": r.model, "change": f"-{r.cost}", "time": r.created_at_bj.strftime('%Y-%m-%d %H:%M') }) return { "items": details, "total": pagination.total, "pages": pagination.pages, "current_page": pagination.page }