|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
- """
- Fund_Filter.py
- 全市场公募基金筛选(开放式+ETF+LOF+QDII+REITs+货币)
- Author = shadowmage
- """
- import akshare as ak
- import pandas as pd
- from datetime import datetime
- from concurrent.futures import ProcessPoolExecutor
- from tqdm import tqdm
- import os
- N_PROC = max(1, os.cpu_count() - 2)
- def get_all_codes():
- # 1) 开放式基金
- open_df = ak.fund_name_em()
- open_df = open_df.rename(columns={'基金代码': 'code', '基金简称': 'name', '基金类型': 'type'})
- # 2) ETF
- etf_df = ak.fund_etf_spot_em()[['代码', '名称']].rename(columns={'代码': 'code', '名称': 'name'})
- etf_df['type'] = 'ETF'
- # 3) LOF
- lof_df = ak.fund_lof_spot_em()[['代码', '名称']].rename(columns={'代码': 'code', '名称': 'name'})
- lof_df['type'] = 'LOF'
- # 4) QDII(香港)
- qdii_df = ak.fund_hk_rank_em()[['基金代码', '基金简称']].rename(columns={'基金代码': 'code', '基金简称': 'name'})
- qdii_df['type'] = 'QDII'
- # 5) REITs
- reits_df = ak.reits_realtime_em()[['代码', '名称']].rename(columns={'代码': 'code', '名称': 'name'})
- reits_df['type'] = 'REITs'
- # 6) 货币型
- money_df = ak.fund_money_fund_daily_em()[['基金代码', '基金简称']].rename(columns={'基金代码': 'code', '基金简称': 'name'})
- money_df['type'] = '货币型'
- # 合并并去重
- all_df = pd.concat([open_df, etf_df, lof_df, qdii_df, reits_df, money_df], ignore_index=True)
- all_df.drop_duplicates(subset=['code'], inplace=True)
- return all_df
- base_df = get_all_codes()
- print(f'全市场基金(含货基)共 {len(base_df)} 只')
- def calc_one(row):
- code = row['code']
- try:
- # 基本信息(成立日、规模)
- info = ak.fund_individual_basic_info_xq(symbol=code)
- setup = pd.to_datetime(info.loc[info['item'] == '成立时间', 'value'].iloc[0])
- scale = float(info.loc[info['item'] == '最新规模', 'value'].iloc[0].replace('亿', ''))
- age = (datetime.now() - setup).days / 365.25
- if scale <= 1 or age <= 4:
- return None
- # 历史净值
- nav_df = ak.fund_open_fund_info_em(symbol=code, indicator='单位净值走势')
- if nav_df.empty or len(nav_df) < 242:
- return None
- nav = nav_df.sort_values('净值日期')['单位净值'].astype(float)
- # 年化收益
- years = len(nav) / 242
- annual = (nav.iloc[-1] / nav.iloc[0]) ** (1 / years) - 1
- if annual <= 0.035:
- return None
- # 上涨比例
- daily = nav.pct_change().dropna()
- up_day = (daily > 0).mean()
- monthly = nav.resample('M').last().pct_change().dropna()
- up_month = (monthly > 0).mean()
- if up_day <= 0.9 or up_month <= 0.9:
- return None
- return {
- '基金代码': code,
- '基金简称': info.loc[info['item'] == '基金简称', 'value'].iloc[0],
- '基金类型': row['type'],
- '成立日期': setup.date(),
- '基金规模(亿元)': scale,
- '成立年限(年)': round(age, 2),
- '年化收益率': round(annual * 100, 2),
- '上涨日数比例': round(up_day * 100, 2),
- '上涨月份比例': round(up_month * 100, 2)
- }
- except Exception:
- return None
- def split_list(lst, n):
- k, m = divmod(len(lst), n)
- return [lst[i*k + min(i, m):(i+1)*k + min(i+1, m)] for i in range(n)]
- def worker(sub_df):
- return [calc_one(r) for _, r in sub_df.iterrows() if calc_one(r)]
- if __name__ == '__main__':
- batches = split_list(base_df, N_PROC)
- results = []
- with ProcessPoolExecutor(max_workers=N_PROC) as pool:
- for part in tqdm(pool.map(worker, batches), total=len(codes), desc='基金只数'):
- results.extend(part)
- final_df = pd.DataFrame(results).sort_values('年化收益率', ascending=False)
- final_df.to_excel('filtered_funds.xlsx', index=False, sheet_name='基金筛选结果')
- print('\n全市场筛选完成:filtered_funds.xlsx')
- print(final_df.head())
复制代码
目前发现执行效率偏低,想要提高效率,有没有什么方法?
# 为 Fund_History.py 添加异步处理和监督机制
要在 Fund_History.py 中添加异步处理并确保程序稳定运行,我建议使用 `asyncio` 和 `aiohttp` 库来实现异步请求,同时添加监督机制来定期检查程序状态。以下是详细的修改方案:
## 1. 首先安装必要的异步库
- pip install aiohttp aiofiles
复制代码
## 2. 修改 Fund_History.py 代码
- """
- Fund_History.py
- 异步版本 - 从天天基金网抓取基金历史净值数据
- """
- import os
- import time
- import random
- import datetime as dt
- import asyncio
- import aiohttp
- import aiofiles
- from bs4 import BeautifulSoup
- import re
- import pandas as pd
- import akshare as ak
- from tqdm import tqdm
- import signal
- import logging
- from logging.handlers import RotatingFileHandler
- # --------------------------------------------------
- # 配置日志记录
- def setup_logging():
- log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
- os.makedirs(log_dir, exist_ok=True)
-
- logger = logging.getLogger("FundHistory")
- logger.setLevel(logging.INFO)
-
- # 文件处理器 - 限制大小并轮转
- file_handler = RotatingFileHandler(
- os.path.join(log_dir, "fund_history.log"),
- maxBytes=10*1024*1024, # 10MB
- backupCount=5
- )
- file_handler.setLevel(logging.INFO)
-
- # 控制台处理器
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
-
- # 格式化
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- file_handler.setFormatter(formatter)
- console_handler.setFormatter(formatter)
-
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
-
- return logger
- logger = setup_logging()
- # --------------------------------------------------
- # 路径配置(脚本同级目录)
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- CODE_FILE = os.path.join(BASE_DIR, "Fundcode.csv")
- SAVE_DIR = os.path.join(BASE_DIR, "data")
- os.makedirs(SAVE_DIR, exist_ok=True)
- # 抓取日期区间(可自行调整)
- START_DATE = "2000-01-01"
- END_DATE = dt.datetime.today().strftime("%Y-%m-%d")
- # 异步请求配置
- MAX_CONCURRENT_REQUESTS = 10 # 并发请求数
- REQUEST_TIMEOUT = 30 # 请求超时时间(秒)
- MAX_RETRY = 5 # 最大重试次数
- PER_SLEEP = (0.3, 1.2) # 请求间隔
- # 监督配置
- HEARTBEAT_INTERVAL = 300 # 心跳间隔(秒)
- HEARTBEAT_FILE = os.path.join(BASE_DIR, "heartbeat.txt") # 心跳文件
- # 请求头
- HEADERS = {
- "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/128.0.0.0 Safari/537.36"),
- "Referer": "http://fund.eastmoney.com/"
- }
- # --------------------------------------------------
- # 心跳监控
- def update_heartbeat():
- """更新心跳时间"""
- try:
- with open(HEARTBEAT_FILE, 'w') as f:
- f.write(str(dt.datetime.now()))
- except Exception as e:
- logger.error(f"更新心跳失败: {e}")
- def check_heartbeat():
- """检查心跳是否正常"""
- try:
- if not os.path.exists(HEARTBEAT_FILE):
- return False
-
- with open(HEARTBEAT_FILE, 'r') as f:
- last_heartbeat = dt.datetime.fromisoformat(f.read().strip())
-
- # 如果超过2倍心跳间隔没有更新,认为程序异常
- if (dt.datetime.now() - last_heartbeat).total_seconds() > HEARTBEAT_INTERVAL * 2:
- return False
- return True
- except:
- return False
- # 信号处理 - 优雅退出
- def signal_handler(signum, frame):
- logger.info(f"接收到信号 {signum},正在优雅退出...")
- # 设置全局标志,让所有任务知道需要退出
- global SHOULD_EXIT
- SHOULD_EXIT = True
- # 注册信号处理
- SHOULD_EXIT = False
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- # --------------------------------------------------
- # 1. akshare 拉全部场外基金代码
- def build_fundcode_csv() -> pd.DataFrame:
- """构建基金代码CSV文件"""
- try:
- latest = ak.fund_name_em() # 全市场最新场外基金
- latest = latest[['基金代码', '基金简称']] # 只保留代码和名称列
- if not os.path.exists(CODE_FILE):
- # 第一次:直接写
- latest.to_csv(CODE_FILE, index=False, encoding='utf-8-sig')
- logger.info(f"首次创建代码表 -> {CODE_FILE} 共{len(latest)}条")
- return latest
- # 已有文件:读取旧表
- old = pd.read_csv(CODE_FILE, dtype=str)
- # 以"基金代码"为主键,找到新增行
- new_rows = latest[~latest['基金代码'].isin(old['基金代码'])]
- if new_rows.empty:
- logger.info("没有新增基金")
- else:
- # 追加写
- new_rows.to_csv(CODE_FILE, mode='a', header=False,
- index=False, encoding='utf-8-sig')
- logger.info(f"追加 {len(new_rows)} 条新基金")
- # 返回完整代码表
- return pd.read_csv(CODE_FILE, dtype=str)
- except Exception as e:
- logger.error(f"构建基金代码CSV失败: {e}")
- raise
- # --------------------------------------------------
- # 2. 异步获取单只基金数据
- async def fetch_one_fund(session, code, name, semaphore):
- """异步获取单只基金数据"""
- csv_path = os.path.join(SAVE_DIR, f"{code}.csv")
-
- # 如果文件已存在,跳过
- if os.path.exists(csv_path):
- return True, code, "已存在,跳过"
-
- # 使用信号量控制并发
- async with semaphore:
- for attempt in range(MAX_RETRY):
- try:
- if SHOULD_EXIT:
- return False, code, "程序退出中"
-
- # 获取基金数据
- url = "https://fundf10.eastmoney.com/F10DataApi.aspx"
- params = {
- "type": "lsjz",
- "code": code,
- "sdate": START_DATE,
- "edate": END_DATE,
- "page": 1,
- "per": 20
- }
-
- # 先获取第一页和总页数
- async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) as resp:
- resp_text = await resp.text()
-
- # 解析总页数
- pages_match = re.search(r"pages:(\d+)", resp_text)
- if not pages_match:
- return False, code, "无法解析总页数"
-
- pages = int(pages_match.group(1))
- records = []
-
- # 获取所有页的数据
- for page in range(1, pages + 1):
- if SHOULD_EXIT:
- return False, code, "程序退出中"
-
- params["page"] = page
- async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) as resp:
- page_text = await resp.text()
-
- soup = BeautifulSoup(page_text, "lxml")
- table = soup.find("table", class_="w782 comm lsjz")
-
- if not table:
- continue
-
- # 提取表格行
- for tr in table.find_all("tr")[1:]: # 跳过表头
- tds = tr.find_all("td")
- if len(tds) < 3:
- continue
-
- date = tds[0].text.strip()
- nav = tds[1].text.strip()
- acc_nav = tds[2].text.strip()
- daily_change = tds[3].text.strip()
-
- records.append([date, nav, acc_nav, daily_change, name])
-
- # 随机延时,避免请求过快
- await asyncio.sleep(random.uniform(*PER_SLEEP))
-
- if not records:
- return False, code, "无数据"
-
- # 创建DataFrame并保存
- df = pd.DataFrame(records, columns=["净值日期", "单位净值", "累计净值", "日增长率", "基金名称"])
- df["净值日期"] = pd.to_datetime(df["净值日期"], errors="coerce")
- df["单位净值"] = pd.to_numeric(df["单位净值"], errors="coerce")
- df["累计净值"] = pd.to_numeric(df["累计净值"], errors="coerce")
- df["日增长率"] = df["日增长率"].str.strip("%").replace("", None).astype(float)
-
- # 按日期排序
- df = df.sort_values("净值日期").reset_index(drop=True)
-
- # 异步保存文件
- async with aiofiles.open(csv_path, 'w', encoding='utf-8-sig') as f:
- await f.write(df.to_csv(index=False))
-
- return True, code, "成功"
-
- except asyncio.TimeoutError:
- logger.warning(f"{code} 第{attempt+1}次请求超时")
- if attempt < MAX_RETRY - 1:
- await asyncio.sleep(random.uniform(*PER_SLEEP))
- else:
- return False, code, "请求超时"
- except Exception as e:
- logger.warning(f"{code} 第{attempt+1}次请求失败: {e}")
- if attempt < MAX_RETRY - 1:
- await asyncio.sleep(random.uniform(*PER_SLEEP))
- else:
- return False, code, f"请求失败: {str(e)}"
-
- return False, code, "超过最大重试次数"
- # --------------------------------------------------
- # 3. 心跳任务
- async def heartbeat_task():
- """定期更新心跳"""
- while not SHOULD_EXIT:
- update_heartbeat()
- await asyncio.sleep(HEARTBEAT_INTERVAL)
- # --------------------------------------------------
- # 4. 主异步函数
- async def main_async():
- """主异步函数"""
- # 构建基金代码列表
- df_code = build_fundcode_csv()
- codes = df_code['基金代码'].tolist()
- names = dict(zip(df_code['基金代码'], df_code['基金简称']))
-
- # 创建信号量控制并发
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
-
- # 创建会话
- connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS, force_close=True)
- async with aiohttp.ClientSession(connector=connector, headers=HEADERS) as session:
- # 创建任务列表
- tasks = []
- for code in codes:
- if SHOULD_EXIT:
- break
- task = asyncio.create_task(fetch_one_fund(session, code, names.get(code, ""), semaphore))
- tasks.append(task)
-
- # 启动心跳任务
- heartbeat = asyncio.create_task(heartbeat_task())
-
- # 使用tqdm显示进度
- completed = 0
- failed = 0
- pbar = tqdm(total=len(tasks), desc="抓取净值")
-
- # 等待所有任务完成
- for coro in asyncio.as_completed(tasks):
- try:
- success, code, message = await coro
- if success:
- completed += 1
- else:
- failed += 1
- logger.warning(f"{code} 抓取失败: {message}")
-
- pbar.update(1)
- pbar.set_postfix(completed=completed, failed=failed)
-
- except Exception as e:
- failed += 1
- logger.error(f"任务异常: {e}")
- pbar.update(1)
- pbar.set_postfix(completed=completed, failed=failed)
-
- # 取消心跳任务
- heartbeat.cancel()
- try:
- await heartbeat
- except asyncio.CancelledError:
- pass
-
- pbar.close()
- logger.info(f"抓取完成: 成功 {completed}, 失败 {failed}")
- # --------------------------------------------------
- # 5. 主函数
- def main():
- """主函数"""
- try:
- # 初始化心跳
- update_heartbeat()
-
- # 运行异步主函数
- asyncio.run(main_async())
-
- logger.info("程序正常完成")
- except KeyboardInterrupt:
- logger.info("用户中断程序")
- except Exception as e:
- logger.error(f"程序异常: {e}")
- # 这里可以添加通知机制,如发送邮件或短信
- finally:
- # 最终更新一次心跳
- update_heartbeat()
- if __name__ == "__main__":
- main()
复制代码
## 3. 创建监督脚本
创建一个单独的监督脚本 `monitor_fund_history.py`:
- """
- monitor_fund_history.py
- 监督 Fund_History.py 运行状态
- """
- import os
- import time
- import datetime as dt
- import subprocess
- import logging
- from logging.handlers import RotatingFileHandler
- # 配置日志
- def setup_logging():
- log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
- os.makedirs(log_dir, exist_ok=True)
-
- logger = logging.getLogger("Monitor")
- logger.setLevel(logging.INFO)
-
- file_handler = RotatingFileHandler(
- os.path.join(log_dir, "monitor.log"),
- maxBytes=5*1024*1024,
- backupCount=3
- )
- file_handler.setLevel(logging.INFO)
-
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
-
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- file_handler.setFormatter(formatter)
- console_handler.setFormatter(formatter)
-
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
-
- return logger
- logger = setup_logging()
- def check_heartbeat(heartbeat_file, max_interval=600):
- """检查心跳文件是否正常"""
- try:
- if not os.path.exists(heartbeat_file):
- return False, "心跳文件不存在"
-
- with open(heartbeat_file, 'r') as f:
- last_heartbeat_str = f.read().strip()
-
- last_heartbeat = dt.datetime.fromisoformat(last_heartbeat_str)
- now = dt.datetime.now()
-
- if (now - last_heartbeat).total_seconds() > max_interval:
- return False, f"心跳已停止 {int((now - last_heartbeat).total_seconds())} 秒"
-
- return True, "心跳正常"
- except Exception as e:
- return False, f"检查心跳失败: {e}"
- def main():
- """主监督函数"""
- base_dir = os.path.dirname(os.path.abspath(__file__))
- script_path = os.path.join(base_dir, "Fund_History.py")
- heartbeat_file = os.path.join(base_dir, "heartbeat.txt")
-
- process = None
-
- while True:
- try:
- # 检查程序是否在运行
- if process is None or process.poll() is not None:
- # 程序不在运行,启动它
- logger.info("启动 Fund_History.py")
- process = subprocess.Popen(["python", script_path])
-
- # 检查心跳
- is_ok, message = check_heartbeat(heartbeat_file)
- if not is_ok:
- logger.warning(f"心跳异常: {message}")
- # 重启程序
- if process:
- process.terminate()
- process.wait(timeout=30)
- process = subprocess.Popen(["python", script_path])
- logger.info("已重启 Fund_History.py")
- else:
- logger.info("程序运行正常")
-
- # 等待一段时间再检查
- time.sleep(300) # 5分钟检查一次
-
- except Exception as e:
- logger.error(f"监督程序异常: {e}")
- time.sleep(60) # 异常后等待1分钟再继续
- if __name__ == "__main__":
- main()
复制代码
## 4. 使用说明
1. **运行主程序**:直接运行 `python Fund_History.py` 开始异步抓取基金数据。
2. **运行监督程序**:在另一个终端或后台运行 `python monitor_fund_history.py`,它会每5分钟检查一次主程序的心跳,如果发现异常会自动重启主程序。
3. **监控日志**:所有运行状态都会记录在 `logs/` 目录下的日志文件中,方便排查问题。
## 5. 关键改进点
1. **异步处理**:使用 `aiohttp` 和 `asyncio` 实现异步请求,大幅提高抓取效率。
2. **并发控制**:通过信号量 (`Semaphore`) 控制最大并发请求数,避免被封IP。
3. **心跳机制**:程序定期更新心跳文件,监督脚本通过检查心跳文件来判断程序是否正常运行。
4. **优雅退出**:捕获系统信号,允许程序在收到中断信号时优雅退出。
5. **完善的日志**:使用 `RotatingFileHandler` 实现日志轮转,避免日志文件过大。
6. **错误重试**:对每个请求实现重试机制,提高稳定性。
7. **监督脚本**:单独的监督脚本可以监控主程序运行状态,并在异常时自动重启。
这些改进应该能显著提高程序的稳定性和效率,同时提供更好的监控和故障恢复能力。
以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
|
|