huibao/backend/test_api.py

80 lines
2.7 KiB
Python
Raw Permalink Normal View History

"""
调试巨潮API - 使用orgId精确查询
"""
import asyncio
import httpx
async def debug_api():
print("=" * 60)
print("调试巨潮API - 使用orgId精确查询")
print("=" * 60)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "https://www.cninfo.com.cn",
"Referer": "https://www.cninfo.com.cn/new/disclosure",
"X-Requested-With": "XMLHttpRequest",
}
stock_code = "300622"
# 第一步搜索获取orgId
print(f"\n1. 搜索 {stock_code} 获取orgId...")
search_url = "https://www.cninfo.com.cn/new/information/topSearch/query"
search_data = {"keyWord": stock_code, "maxNum": 5}
org_id = None
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(search_url, data=search_data, headers=headers)
if response.status_code == 200:
results = response.json()
for item in results:
if item.get("code") == stock_code:
org_id = item.get("orgId")
print(f" 找到 orgId: {org_id}")
print(f" 公司简称: {item.get('zwjc')}")
break
if not org_id:
print(" 未找到orgId")
return
# 第二步使用orgId查询公告
print(f"\n2. 使用 orgId={org_id} 查询年报...")
api_url = "https://www.cninfo.com.cn/new/hisAnnouncement/query"
form_data = {
"pageNum": "1",
"pageSize": "30",
"column": "szse",
"tabName": "fulltext",
"plate": "",
"stock": f"{stock_code},{org_id}", # 使用 "代码,orgId" 格式
"searchkey": "",
"secid": "",
"category": "category_ndbg_szsh",
"trade": "",
"seDate": "",
"sortName": "",
"sortType": "",
"isHLtitle": "true"
}
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(api_url, data=form_data, headers=headers)
if response.status_code == 200:
data = response.json()
announcements = data.get("announcements", [])
print(f" 返回 {len(announcements)} 条记录")
for ann in announcements[:5]:
sec_code = ann.get("secCode", "")
sec_name = ann.get("secName", "")
title = ann.get("announcementTitle", "")
print(f" [{sec_code}] {sec_name}: {title[:40]}")
if __name__ == "__main__":
asyncio.run(debug_api())