""" 巨潮资讯网爬虫服务 (新版API) 用于获取上市公司年报/半年报 """ import os import re import asyncio from datetime import datetime from typing import List, Optional, Dict, Any import httpx from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.config import settings from app.utils.logger import logger from app.models import Company, Report class CninfoService: """巨潮资讯网服务 - 适配新版API""" def __init__(self): # 新版巨潮网址 self.base_url = "https://www.cninfo.com.cn" # 新版公告查询API self.api_url = "https://www.cninfo.com.cn/new/hisAnnouncement/query" # 搜索API(用于获取orgId) self.search_url = "https://www.cninfo.com.cn/new/information/topSearch/query" self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Origin": "https://www.cninfo.com.cn", "Referer": "https://www.cninfo.com.cn/new/disclosure", "X-Requested-With": "XMLHttpRequest", } self.timeout = httpx.Timeout(30.0, connect=10.0) def _get_plate_by_code(self, stock_code: str) -> str: """根据股票代码判断板块 (新版API的column参数)""" if stock_code.startswith("6"): return "sse" # 上交所 elif stock_code.startswith("0") or stock_code.startswith("3"): return "szse" # 深交所 elif stock_code.startswith("8") or stock_code.startswith("4"): return "bse" # 北交所 return "szse" async def get_org_id(self, stock_code: str) -> Optional[str]: """ 通过搜索API获取公司的orgId 这是精确查询公告的关键 """ try: search_data = {"keyWord": stock_code, "maxNum": 5} async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client: response = await client.post( self.search_url, data=search_data, headers=self.headers ) if response.status_code == 200: results = response.json() for item in results: if item.get("code") == stock_code: org_id = item.get("orgId") logger.info(f"获取到 {stock_code} 的 orgId: {org_id}") return org_id logger.warning(f"无法获取 {stock_code} 的 orgId") return None except Exception as e: logger.error(f"获取orgId失败: {stock_code}, 错误: {e}") return None async def search_reports( self, stock_code: str, org_id: Optional[str] = None, report_types: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ 搜索公司的年报/半年报 Args: stock_code: 股票代码 org_id: 巨潮机构ID (如果为None,会自动获取) report_types: 报告类型列表,默认为年报和半年报 Returns: 报告列表 """ if report_types is None: report_types = settings.REPORT_TYPES # 如果没有orgId,先获取 if not org_id: org_id = await self.get_org_id(stock_code) if not org_id: logger.error(f"无法获取 {stock_code} 的orgId,跳过") return [] reports = [] plate = self._get_plate_by_code(stock_code) try: # 新版API的category参数 category_map = { "年度报告": "category_ndbg_szsh", "半年度报告": "category_bndbg_szsh" } for report_type in report_types: category = category_map.get(report_type) if not category: continue # 构建请求参数 - 关键:stock参数使用 "代码,orgId" 格式 form_data = { "pageNum": "1", "pageSize": "30", "column": plate, "tabName": "fulltext", "plate": "", "stock": f"{stock_code},{org_id}", # 关键格式! "searchkey": "", "secid": "", "category": category, "trade": "", "seDate": "", "sortName": "", "sortType": "", "isHLtitle": "true" } logger.info(f"正在查询 {stock_code} 的 {report_type}...") async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client: response = await client.post( self.api_url, data=form_data, headers=self.headers ) if response.status_code == 200: data = response.json() announcements = data.get("announcements") or [] logger.info(f"找到 {len(announcements)} 条公告记录") for ann in announcements: # 验证是目标公司 if ann.get("secCode") != stock_code: continue title = ann.get("announcementTitle", "") # 过滤摘要、更正等非正式报告 if self._is_valid_report(title, report_type): report_year = self._extract_year(title) # 只抓取2024年及以后的报告 if report_year and report_year < 2024: logger.debug(f"跳过旧报告: {title} (年份: {report_year})") continue report_info = { "title": title, "report_type": report_type, "announcement_id": ann.get("announcementId"), "announcement_time": ann.get("announcementTime"), "sec_code": ann.get("secCode"), "sec_name": ann.get("secName"), "org_id": ann.get("orgId"), "pdf_url": self._build_pdf_url(ann.get("adjunctUrl")), "report_year": report_year, "report_period": self._extract_period(title, report_type) } reports.append(report_info) logger.info(f"✓ 匹配报告: [{stock_code}] {title}") else: logger.error(f"API请求失败: {response.status_code}") # 请求间隔2秒,避免被封禁 await asyncio.sleep(2) except Exception as e: logger.error(f"搜索报告失败: {stock_code}, 错误: {e}") import traceback logger.error(traceback.format_exc()) return reports def _is_valid_report(self, title: str, report_type: str) -> bool: """判断是否为有效的正式报告(排除摘要、更正等)""" # 排除关键词 exclude_keywords = ["摘要", "更正", "补充", "英文", "修订", "修正", "取消", "披露", "提示"] for keyword in exclude_keywords: if keyword in title: return False # 确认包含报告类型 if report_type == "年度报告": return "年度报告" in title or "年报" in title elif report_type == "半年度报告": return "半年度报告" in title or "半年报" in title return False def _build_pdf_url(self, adjunct_url: Optional[str]) -> Optional[str]: """构建PDF下载URL - 使用新版静态资源域名""" if not adjunct_url: return None # 如果已经是完整URL if adjunct_url.startswith("http"): return adjunct_url # 新版巨潮使用 static.cninfo.com.cn 作为PDF下载域名 return f"https://static.cninfo.com.cn/{adjunct_url}" def _extract_year(self, title: str) -> Optional[int]: """从标题中提取年份""" match = re.search(r"(\d{4})\s*年", title) if match: return int(match.group(1)) return None def _extract_period(self, title: str, report_type: str) -> str: """提取报告期间""" if report_type == "年度报告": return "年报" elif report_type == "半年度报告": return "半年报" return "" async def download_pdf(self, pdf_url: str, save_path: str) -> bool: """ 下载PDF文件 """ try: # 确保目录存在 os.makedirs(os.path.dirname(save_path), exist_ok=True) logger.info(f"开始下载: {pdf_url}") async with httpx.AsyncClient(timeout=httpx.Timeout(180.0), verify=False) as client: response = await client.get(pdf_url, headers=self.headers, follow_redirects=True) if response.status_code == 200: with open(save_path, "wb") as f: f.write(response.content) file_size = os.path.getsize(save_path) logger.info(f"✓ 下载成功: {save_path}, 大小: {file_size / 1024 / 1024:.2f} MB") return True else: logger.error(f"下载失败: {pdf_url}, 状态码: {response.status_code}") return False except Exception as e: logger.error(f"下载异常: {pdf_url}, 错误: {e}") return False async def sync_company_reports( self, db: AsyncSession, company: Company, force_download: bool = False ) -> int: """ 同步公司的报告 """ logger.info(f"========== 开始同步: {company.stock_code} {company.short_name} ==========") # 获取或更新orgId org_id = company.org_id if not org_id: org_id = await self.get_org_id(company.stock_code) if org_id: company.org_id = org_id await db.commit() # 搜索报告 reports = await self.search_reports(company.stock_code, org_id) if not reports: logger.warning(f"未找到任何报告: {company.stock_code}") return 0 new_count = 0 for report_info in reports: # 检查是否已存在 stmt = select(Report).where( Report.announcement_id == report_info["announcement_id"] ) result = await db.execute(stmt) existing = result.scalar_one_or_none() if existing: if not force_download: logger.debug(f"报告已存在: {report_info['title']}") continue # 创建新报告记录 report = Report( company_id=company.id, title=report_info["title"], report_type=report_info["report_type"], report_year=report_info["report_year"], report_period=report_info["report_period"], announcement_id=report_info["announcement_id"], announcement_time=datetime.fromtimestamp( report_info["announcement_time"] / 1000 ) if report_info["announcement_time"] else None, pdf_url=report_info["pdf_url"] ) # 构建本地保存路径 year = report_info["report_year"] or "unknown" period = report_info["report_period"] or "report" filename = f"{company.stock_code}_{year}_{period}.pdf" save_path = os.path.join(settings.PDF_DIR, company.stock_code, filename) # 下载PDF if report_info["pdf_url"]: success = await self.download_pdf(report_info["pdf_url"], save_path) if success: report.is_downloaded = True report.local_path = save_path report.file_size = os.path.getsize(save_path) if not existing: db.add(report) new_count += 1 await db.commit() # 避免请求过快 await asyncio.sleep(0.5) logger.info(f"========== 同步完成: {company.stock_code}, 新增 {new_count} 份报告 ==========") return new_count # 创建全局服务实例 cninfo_service = CninfoService()