huibao/backend/app/services/cninfo_crawler.py

352 lines
14 KiB
Python
Raw Permalink Normal View History

"""
巨潮资讯网爬虫服务 (新版API)
用于获取上市公司年报/半年报
"""
import os
import re
import asyncio
from datetime import datetime
from typing import List, Optional, Dict, Any
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.utils.logger import logger
from app.models import Company, Report
class CninfoService:
"""巨潮资讯网服务 - 适配新版API"""
def __init__(self):
# 新版巨潮网址
self.base_url = "https://www.cninfo.com.cn"
# 新版公告查询API
self.api_url = "https://www.cninfo.com.cn/new/hisAnnouncement/query"
# 搜索API用于获取orgId
self.search_url = "https://www.cninfo.com.cn/new/information/topSearch/query"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "https://www.cninfo.com.cn",
"Referer": "https://www.cninfo.com.cn/new/disclosure",
"X-Requested-With": "XMLHttpRequest",
}
self.timeout = httpx.Timeout(30.0, connect=10.0)
def _get_plate_by_code(self, stock_code: str) -> str:
"""根据股票代码判断板块 (新版API的column参数)"""
if stock_code.startswith("6"):
return "sse" # 上交所
elif stock_code.startswith("0") or stock_code.startswith("3"):
return "szse" # 深交所
elif stock_code.startswith("8") or stock_code.startswith("4"):
return "bse" # 北交所
return "szse"
async def get_org_id(self, stock_code: str) -> Optional[str]:
"""
通过搜索API获取公司的orgId
这是精确查询公告的关键
"""
try:
search_data = {"keyWord": stock_code, "maxNum": 5}
async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client:
response = await client.post(
self.search_url,
data=search_data,
headers=self.headers
)
if response.status_code == 200:
results = response.json()
for item in results:
if item.get("code") == stock_code:
org_id = item.get("orgId")
logger.info(f"获取到 {stock_code} 的 orgId: {org_id}")
return org_id
logger.warning(f"无法获取 {stock_code} 的 orgId")
return None
except Exception as e:
logger.error(f"获取orgId失败: {stock_code}, 错误: {e}")
return None
async def search_reports(
self,
stock_code: str,
org_id: Optional[str] = None,
report_types: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
搜索公司的年报/半年报
Args:
stock_code: 股票代码
org_id: 巨潮机构ID (如果为None会自动获取)
report_types: 报告类型列表默认为年报和半年报
Returns:
报告列表
"""
if report_types is None:
report_types = settings.REPORT_TYPES
# 如果没有orgId先获取
if not org_id:
org_id = await self.get_org_id(stock_code)
if not org_id:
logger.error(f"无法获取 {stock_code} 的orgId跳过")
return []
reports = []
plate = self._get_plate_by_code(stock_code)
try:
# 新版API的category参数
category_map = {
"年度报告": "category_ndbg_szsh",
"半年度报告": "category_bndbg_szsh"
}
for report_type in report_types:
category = category_map.get(report_type)
if not category:
continue
# 构建请求参数 - 关键stock参数使用 "代码,orgId" 格式
form_data = {
"pageNum": "1",
"pageSize": "30",
"column": plate,
"tabName": "fulltext",
"plate": "",
"stock": f"{stock_code},{org_id}", # 关键格式!
"searchkey": "",
"secid": "",
"category": category,
"trade": "",
"seDate": "",
"sortName": "",
"sortType": "",
"isHLtitle": "true"
}
logger.info(f"正在查询 {stock_code}{report_type}...")
async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client:
response = await client.post(
self.api_url,
data=form_data,
headers=self.headers
)
if response.status_code == 200:
data = response.json()
announcements = data.get("announcements") or []
logger.info(f"找到 {len(announcements)} 条公告记录")
for ann in announcements:
# 验证是目标公司
if ann.get("secCode") != stock_code:
continue
title = ann.get("announcementTitle", "")
# 过滤摘要、更正等非正式报告
if self._is_valid_report(title, report_type):
report_year = self._extract_year(title)
# 只抓取2024年及以后的报告
if report_year and report_year < 2024:
logger.debug(f"跳过旧报告: {title} (年份: {report_year})")
continue
report_info = {
"title": title,
"report_type": report_type,
"announcement_id": ann.get("announcementId"),
"announcement_time": ann.get("announcementTime"),
"sec_code": ann.get("secCode"),
"sec_name": ann.get("secName"),
"org_id": ann.get("orgId"),
"pdf_url": self._build_pdf_url(ann.get("adjunctUrl")),
"report_year": report_year,
"report_period": self._extract_period(title, report_type)
}
reports.append(report_info)
logger.info(f"✓ 匹配报告: [{stock_code}] {title}")
else:
logger.error(f"API请求失败: {response.status_code}")
# 请求间隔2秒避免被封禁
await asyncio.sleep(2)
except Exception as e:
logger.error(f"搜索报告失败: {stock_code}, 错误: {e}")
import traceback
logger.error(traceback.format_exc())
return reports
def _is_valid_report(self, title: str, report_type: str) -> bool:
"""判断是否为有效的正式报告(排除摘要、更正等)"""
# 排除关键词
exclude_keywords = ["摘要", "更正", "补充", "英文", "修订", "修正", "取消", "披露", "提示"]
for keyword in exclude_keywords:
if keyword in title:
return False
# 确认包含报告类型
if report_type == "年度报告":
return "年度报告" in title or "年报" in title
elif report_type == "半年度报告":
return "半年度报告" in title or "半年报" in title
return False
def _build_pdf_url(self, adjunct_url: Optional[str]) -> Optional[str]:
"""构建PDF下载URL - 使用新版静态资源域名"""
if not adjunct_url:
return None
# 如果已经是完整URL
if adjunct_url.startswith("http"):
return adjunct_url
# 新版巨潮使用 static.cninfo.com.cn 作为PDF下载域名
return f"https://static.cninfo.com.cn/{adjunct_url}"
def _extract_year(self, title: str) -> Optional[int]:
"""从标题中提取年份"""
match = re.search(r"(\d{4})\s*年", title)
if match:
return int(match.group(1))
return None
def _extract_period(self, title: str, report_type: str) -> str:
"""提取报告期间"""
if report_type == "年度报告":
return "年报"
elif report_type == "半年度报告":
return "半年报"
return ""
async def download_pdf(self, pdf_url: str, save_path: str) -> bool:
"""
下载PDF文件
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(save_path), exist_ok=True)
logger.info(f"开始下载: {pdf_url}")
async with httpx.AsyncClient(timeout=httpx.Timeout(180.0), verify=False) as client:
response = await client.get(pdf_url, headers=self.headers, follow_redirects=True)
if response.status_code == 200:
with open(save_path, "wb") as f:
f.write(response.content)
file_size = os.path.getsize(save_path)
logger.info(f"✓ 下载成功: {save_path}, 大小: {file_size / 1024 / 1024:.2f} MB")
return True
else:
logger.error(f"下载失败: {pdf_url}, 状态码: {response.status_code}")
return False
except Exception as e:
logger.error(f"下载异常: {pdf_url}, 错误: {e}")
return False
async def sync_company_reports(
self,
db: AsyncSession,
company: Company,
force_download: bool = False
) -> int:
"""
同步公司的报告
"""
logger.info(f"========== 开始同步: {company.stock_code} {company.short_name} ==========")
# 获取或更新orgId
org_id = company.org_id
if not org_id:
org_id = await self.get_org_id(company.stock_code)
if org_id:
company.org_id = org_id
await db.commit()
# 搜索报告
reports = await self.search_reports(company.stock_code, org_id)
if not reports:
logger.warning(f"未找到任何报告: {company.stock_code}")
return 0
new_count = 0
for report_info in reports:
# 检查是否已存在
stmt = select(Report).where(
Report.announcement_id == report_info["announcement_id"]
)
result = await db.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
if not force_download:
logger.debug(f"报告已存在: {report_info['title']}")
continue
# 创建新报告记录
report = Report(
company_id=company.id,
title=report_info["title"],
report_type=report_info["report_type"],
report_year=report_info["report_year"],
report_period=report_info["report_period"],
announcement_id=report_info["announcement_id"],
announcement_time=datetime.fromtimestamp(
report_info["announcement_time"] / 1000
) if report_info["announcement_time"] else None,
pdf_url=report_info["pdf_url"]
)
# 构建本地保存路径
year = report_info["report_year"] or "unknown"
period = report_info["report_period"] or "report"
filename = f"{company.stock_code}_{year}_{period}.pdf"
save_path = os.path.join(settings.PDF_DIR, company.stock_code, filename)
# 下载PDF
if report_info["pdf_url"]:
success = await self.download_pdf(report_info["pdf_url"], save_path)
if success:
report.is_downloaded = True
report.local_path = save_path
report.file_size = os.path.getsize(save_path)
if not existing:
db.add(report)
new_count += 1
await db.commit()
# 避免请求过快
await asyncio.sleep(0.5)
logger.info(f"========== 同步完成: {company.stock_code}, 新增 {new_count} 份报告 ==========")
return new_count
# 创建全局服务实例
cninfo_service = CninfoService()