huibao/backend/app/services/ai_analyzer.py

369 lines
15 KiB
Python
Raw Permalink Normal View History

"""
AI分析服务 - 多AI分层分析
"""
import json
import asyncio
from typing import List, Dict, Optional
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.utils.logger import logger
from app.models import Report, ExtractedContent, AnalysisResult, AnalysisStatus
class AIAnalyzer:
"""AI分析服务"""
def __init__(self):
self.api_url = settings.AI_API_URL
self.api_key = settings.AI_API_KEY
self.model = settings.AI_MODEL
self.parallel_count = settings.AI_PARALLEL_COUNT
self.timeout = httpx.Timeout(600.0) # 增加到10分钟超时
# 复用 client 以提升性能
self.client = httpx.AsyncClient(timeout=self.timeout)
async def __del__(self):
await self.client.aclose()
async def call_ai(self, prompt: str, system_prompt: str = None) -> Dict:
"""调用AI接口带重试机制 - 复用Client"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 4096
}
max_retries = 3
retry_delays = [2, 5, 10]
for attempt in range(max_retries + 1):
try:
# 使用复用的 client
response = await self.client.post(
self.api_url,
headers=headers,
json=payload
)
if response.status_code == 200:
data = response.json()
content = data["choices"][0]["message"]["content"]
tokens = data.get("usage", {}).get("total_tokens", 0)
return {"success": True, "content": content, "tokens": tokens}
else:
error_msg = f"AI调用失败: {response.status_code} {response.text}"
logger.error(error_msg)
if response.status_code < 500:
return {"success": False, "error": response.text}
raise Exception(error_msg)
except Exception as e:
if attempt < max_retries:
delay = retry_delays[attempt]
logger.warning(f"AI调用异常: {e}{delay}秒后重试...")
await asyncio.sleep(delay)
else:
logger.error(f"AI调用异常: {e}")
return {"success": False, "error": str(e)}
async def analyze_section(self, section: ExtractedContent, company_name: str) -> Dict:
"""分析单个章节 - 定制化电商+战略扩展+数字化视角"""
keyword = section.section_keyword
# 判断内容类型
is_financial = any(kw in keyword for kw in ["资产负债表", "利润表", "现金流量表", "财务数据", "财务指标"])
is_operation = any(kw in keyword for kw in ["主营业务", "经营情况", "收入", "成本", "毛利率", "费用", "研发"])
is_market = any(kw in keyword for kw in ["行业", "市场", "竞争", "份额"])
# 系统提示词
system_prompt = """你是一位服务于亿级电商眼镜商家(全平台、多品牌)的战略顾问。
客户现状主营太阳镜/运动/光学镜年收1亿正在进行渠道扩张分销和数字化转型
你的任务基于上市公司年报原文为客户提供实战参考和战略情报
核心准则
1. **身份定位**你是顾问要提供不仅是数据更是业务视角的洞察
2. **证据至上**你的所有结论必须直接引自或推导自原文事实
3. **拒绝脑补**严禁任何形式的脱离原文的反向推导猜测或幻觉
4. **诚实声明**如果关键信息缺失请直接指出严禁臆测"""
if is_financial:
analysis_prompt = f"""作为顾问,请研读{company_name}的财务数据片段结合客户年收1亿且在转型的需求进行分析
---
{section.content[:10000]}
---
任务
1. **真实IT底色**从报表中找出确凿的系统/软件/研发投入
2. **经营稳定性**通过存货和利润数据分析其作为龙头的真实盈利质量
3. **投入产出比**分析其费用结构是否合理
注意仅分析原文绝不编造"""
elif is_operation:
analysis_prompt = f"""作为顾问,请从{company_name}的经营描述中,拆解其对客户有价值的实战战术:
---
{section.content[:10000]}
---
任务
1. **真实数字化**原文中是否明确提到了使用的系统解决了什么业务问题
2. **渠道体系**其分销/加盟/直营模式的真实描述是什么
3. **品类逻辑**他们在什么品类上投入最重
注意严禁强行关联数字化"""
elif is_market:
analysis_prompt = f"""作为顾问,请分析{company_name}眼中的市场局局势,为客户的战略布局提供方向:
---
{section.content[:8000]}
---
任务
1. **核心壁垒**大厂公开承认的竞争优势是什么
2. **行业机会**原文中他们重点看好哪些细分增长点
注意仅提炼策略"""
else:
analysis_prompt = f"""作为顾问,分析{company_name}关于「{keyword}」的真实战略图谋:
---
{section.content[:8000]}
---
任务
1. **明确投向**未来大厂明确要投入资源解决的问题是什么
2. **战略排序**他们的重心是在求快还是求稳
注意必须基于原文描述"""
result = await self.call_ai(analysis_prompt, system_prompt)
return {
"section_name": section.section_name,
"keyword": section.section_keyword,
"analysis": result.get("content", ""),
"tokens": result.get("tokens", 0),
"success": result.get("success", False),
"content_type": "financial" if is_financial else ("operation" if is_operation else ("market" if is_market else "strategy"))
}
async def summarize_analyses(self, analyses: List[Dict], company_name: str, report_title: str) -> Dict:
"""汇总分析 - 生成《战略扩张与数字化内参》"""
system_prompt = """你是一位专注于零售品牌增长的战略顾问。
客户是年收1亿的电商眼镜品牌正处于拓展分销拓新品类数字化转型的关键期
请基于上市公司年报为客户提供一份具备战略高度的扩张与升级指南"""
analyses_text = ""
# 财务类情报
fin_insights = [a['analysis'] for a in analyses if a.get('success') and a.get('content_type') == 'financial']
analyses_text += "【财务情报】\n" + "\n".join(fin_insights) + "\n\n"
# 运营类情报
op_insights = [a['analysis'] for a in analyses if a.get('success') and a.get('content_type') == 'operation']
analyses_text += "【运营实战】\n" + "\n".join(op_insights) + "\n\n"
# 市场/战略情报
mkt_insights = [a['analysis'] for a in analyses if a.get('success') and a.get('content_type') in ['market', 'strategy']]
analyses_text += "【战略情报】\n" + "\n".join(mkt_insights)
prompt = f"""以下是从{company_name}{report_title}》提炼出的关键情报:
{analyses_text}
请合成一份**高密度实战内参**
内参结构
## 📊 关键指标对标局
## 🛠 数字化实战方案
## 🌍 全渠道与分销战法
## 🎯 新机会雷达
## 💡 顾问最终建议3条"""
result = await self.call_ai(prompt, system_prompt)
return {
"summary": result.get("content", ""),
"tokens": result.get("tokens", 0),
"success": result.get("success", False)
}
async def analyze_report(self, db: AsyncSession, report: Report, force: bool = False) -> bool:
"""分析完整报告"""
if report.is_analyzed and not force:
logger.info(f"报告已分析: {report.title}")
return True
# 更新状态
report.analysis_status = AnalysisStatus.ANALYZING.value
await db.commit()
# 获取提取的内容
stmt = select(ExtractedContent).where(ExtractedContent.report_id == report.id)
result = await db.execute(stmt)
contents = result.scalars().all()
if not contents:
logger.info(f"报告无提取内容,尝试自动提取: {report.title}")
from app.services.pdf_extractor import pdf_extractor
try:
contents = await pdf_extractor.extract_and_save(db, report)
if not contents:
report.analysis_status = AnalysisStatus.FAILED.value
await db.commit()
return False
except Exception as e:
logger.error(f"自动提取异常: {e}")
report.analysis_status = AnalysisStatus.FAILED.value
await db.commit()
return False
from app.models import Company
company_stmt = select(Company).where(Company.id == report.company_id)
company_result = await db.execute(company_stmt)
company = company_result.scalar_one_or_none()
company_name = company.short_name or company.company_name if company else "未知公司"
logger.info(f"开始分析报告: {report.title}, 共 {len(contents)} 个章节")
# 清除旧结果
try:
from sqlalchemy import delete
stmt = delete(AnalysisResult).where(AnalysisResult.report_id == report.id)
await db.execute(stmt)
await db.commit()
except Exception as e:
logger.error(f"清除旧结果失败: {e}")
# 使用 Worker 队列模式而非 gather
queue = asyncio.Queue()
for i, c in enumerate(contents):
queue.put_nowait((i, c))
valid_results = []
# 准备一个专门用于增量保存的锁,防止 session 并发提交冲突
db_lock = asyncio.Lock()
async def worker():
while not queue.empty():
try:
# 使用 get_nowait 避免在此处挂起等待
idx, content = queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
# 主动让出,确保不阻塞主线程
await asyncio.sleep(0.5)
logger.info(f"Worker处理章节 [{idx+1}/{len(contents)}]: {content.section_name}")
res = await self.analyze_section(content, company_name)
if res.get("success"):
async with db_lock:
try:
from app.database import AsyncSessionLocal
async with AsyncSessionLocal() as session:
analysis = AnalysisResult(
report_id=report.id,
analysis_type="section",
section_name=res["section_name"],
ai_model=self.model,
summary=res["analysis"],
token_count=res["tokens"],
is_final=False
)
session.add(analysis)
await session.commit()
valid_results.append(res)
except Exception as e:
logger.error(f"章节增量保存失败: {e}")
except Exception as e:
logger.error(f"Worker异常: {e}")
finally:
queue.task_done()
# 启动 Consumer Workers
workers = [asyncio.create_task(worker()) for _ in range(self.parallel_count)]
await asyncio.gather(*workers)
# 汇总分析
report.analysis_status = AnalysisStatus.SUMMARIZING.value
await db.commit()
if valid_results:
summary = await self.summarize_analyses(valid_results, company_name, report.title)
if summary.get("success"):
final_analysis = AnalysisResult(
report_id=report.id,
analysis_type="summary",
section_name="综合分析",
ai_model=self.model,
summary=summary["summary"],
token_count=summary.get("tokens", 0),
is_final=True
)
db.add(final_analysis)
# 更新报告状态
report.is_analyzed = True
report.analysis_status = AnalysisStatus.COMPLETED.value
await db.commit()
return True
async def analyze_with_quality_check(
self,
db: AsyncSession,
report: Report,
enable_quality_check: bool = False
) -> bool:
"""带质量检查的分析(可选启用多评判师评估)"""
result = await self.analyze_report(db, report)
if result and enable_quality_check:
try:
from app.services.quality_evaluator import quality_evaluator
# 获取刚生成的汇总
stmt = select(AnalysisResult).where(
AnalysisResult.report_id == report.id,
AnalysisResult.analysis_type == "summary"
)
from sqlalchemy import select
result = await db.execute(stmt)
summary_result = result.scalar_one_or_none()
if summary_result:
passed, evaluation = await quality_evaluator.evaluate_and_decide(
db, summary_result
)
logger.info(f"质量评估: 通过={passed}, 评分={evaluation.get('avg_score', 0)}")
if not passed:
logger.warning(f"质量不达标,建议人工复核: {report.title}")
except ImportError:
logger.warning("质量评估模块未加载")
return result
ai_analyzer = AIAnalyzer()