huibao/backend/init_data.py

61 lines
2.0 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import httpx
# 预置的眼镜行业上市公司列表
COMPANIES = [
{
"stock_code": "300622",
"company_name": "博士眼镜连锁股份有限公司",
"short_name": "博士眼镜",
"industry": "眼镜零售"
},
{
"stock_code": "301101",
"company_name": "明月镜片股份有限公司",
"short_name": "明月镜片",
"industry": "镜片制造"
},
{
"stock_code": "300595",
"company_name": "欧普康视科技股份有限公司",
"short_name": "欧普康视",
"industry": "医疗器械/OK镜"
},
{
"stock_code": "300015",
"company_name": "爱尔眼科医院集团股份有限公司",
"short_name": "爱尔眼科",
"industry": "眼科医疗"
},
{
"stock_code": "002223",
"company_name": "江苏鱼跃医疗设备股份有限公司",
"short_name": "鱼跃医疗",
"industry": "医疗器械"
}
]
API_URL = "http://localhost:8000/api/companies"
async def init_data():
print("开始添加预置公司...")
async with httpx.AsyncClient() as client:
for company in COMPANIES:
try:
# 检查是否已存在
response = await client.post(API_URL, json=company)
if response.status_code == 200:
print(f"✅ 成功添加: {company['short_name']} ({company['stock_code']})")
elif response.status_code == 400 and "已存在" in response.text:
print(f" 已存在: {company['short_name']} ({company['stock_code']})")
else:
print(f"❌ 添加失败: {company['short_name']} - {response.text}")
except Exception as e:
print(f"❌ 请求错误: {e}")
print("\n完成!请刷新「公司管理」页面查看。")
print("提示:添加完成后,请点击页面左下角的「立即同步」按钮开始抓取报告。")
if __name__ == "__main__":
asyncio.run(init_data())