|
下面我们慢慢一点一点看代码吧。
项目克隆到本地,项目文件夹是FundCrawler,目录结构如下:
以下是 `FundCrawler` 代码文件夹的目录结构:
```
FundCrawler/
├── README.md
├── fund_crawler.py
├── LICENSE
├── requirements.txt
├── run.py
├── test_run.py
├── utils/
│ ├── result_analyse.py
│ ├── constants.py
│ └── rate_control_analyse.py
├── docs/
├── .git/
├── module/
│ ├── data_mining/
│ │ ├── strategy/
│ │ │ ├── overview.py
│ │ │ └── manager.py
│ │ ├── data_mining.py
│ │ └── abstract_data_mining_module.py
│ ├── downloader/
│ │ └── download_by_requests.py
│ ├── fund_context.py
│ ├── process_manager.py
│ ├── saving_result/
│ │ └── save_result_2_file.py
│ ├── abstract_saving_result_module.py
│ └── crawling_target/
│ └── get_small_batch_4_test.py
└── __pycache__/
```
先从module文件夹的代码开始读吧。
第一个py程序是abstract_crawling_target_module.py,代码如下:
from abc import abstractmethod, ABC
from typing import List
from module.fund_context import FundContext
class CrawlingTargetModule(ABC):
"""
基金爬取任务模块(基类)
通过生成器逐个给出 需要爬取的基金
"""
@abstractmethod
def get_fund_list(self) -> List[FundContext]:
"""
获取需要爬取的基金列表
"""
pass
第二个abstract_data_mining_module.py,代码如下:
from abc import abstractmethod, ABC
from typing import List, Optional
from module.fund_context import FundContext
from utils.constants import PageType
class DataMiningModule(ABC):
"""
爬取到的原始结果 解析
"""
@abstractmethod
def summit_context(self, context: FundContext) -> Optional[List[tuple[PageType, str]]]:
"""
提交基金信息的上下文
当返回url列表时,代表还需要爬取对应的网站(并将res添加到context中) / 没有返回时代表没有需要爬取的数据了(结果是最终结果)
"""
pass
第三个abstract_saving_result_module.py,代码如下:
from abc import abstractmethod, ABC
from module.fund_context import FundContext
class SavingResultModule(ABC):
"""
基金数据的保存模块
"""
@abstractmethod
def save_result(self, result: FundContext) -> None:
"""
爬取结果的保存
"""
pass
def exit(self) -> None:
"""
对于运行中止时需要后处理的场景
"""
pass
第四个fund_context.py,代码如下:
"""
模块间交互所使用的BO
"""
from typing import Optional
from module.downloader.download_by_requests import FundResponse
from utils.constants import FundAttrKey, PageType
class FundContext:
"""
基金爬取数据的上下文
"""
def __init__(self, fund_code: str, fund_name: str):
self.fund_code: str = fund_code
self.fund_name: str = fund_name
self.fund_type: Optional[str] = None
self.fund_size: Optional[str] = None
self.fund_company: Optional[str] = None
self.fund_value: Optional[str] = None
self.fund_manager: Optional[str] = None
self.date_of_appointment: Optional[str] = None
self.standard_deviation_three_years: Optional[str] = None
self.sharpe_three_years: Optional[str] = None
self.three_years_increase: Optional[str] = None
self.five_years_increase: Optional[str] = None
# 爬取到的网页数据
self.http_response_dict: dict[PageType, FundResponse] = dict()
def to_result_row(self) -> dict[FundAttrKey, Optional[str]]:
return {
FundAttrKey.FUND_CODE: self.fund_code,
FundAttrKey.FUND_SIMPLE_NAME: self.fund_name,
FundAttrKey.FUND_TYPE: self.fund_type,
FundAttrKey.FUND_SIZE: self.fund_size,
FundAttrKey.FUND_COMPANY: self.fund_company,
FundAttrKey.FUND_VALUE: self.fund_value,
FundAttrKey.FUND_MANAGER: self.fund_manager,
FundAttrKey.DATE_OF_APPOINTMENT: self.date_of_appointment,
FundAttrKey.STANDARD_DEVIATION_THREE_YEARS: self.standard_deviation_three_years,
FundAttrKey.SHARPE_THREE_YEARS: self.sharpe_three_years,
FundAttrKey.THREE_YEARS_INCREASE: self.three_years_increase,
FundAttrKey.FIVE_YEARS_INCREASE: self.five_years_increase,
}
最后一个process_manager.py,代码很多:
"""
负责统领和协调数据爬取的流程
"""
import logging
from queue import Empty
from threading import Thread
from time import sleep
from typing import List, Optional
from tqdm import tqdm
from module.abstract_crawling_target_module import CrawlingTargetModule
from module.abstract_data_mining_module import DataMiningModule
from module.abstract_saving_result_module import SavingResultModule
from module.downloader.download_by_requests import FundRequest, GetPageOnSubProcess
from module.fund_context import FundContext
from utils.constants import PageType
class TaskManager:
"""
爬取的核心流程
"""
def __init__(self, need_crawled_fund_module: CrawlingTargetModule, data_mining_module: DataMiningModule,
save_result_module: SavingResultModule):
# 事件列表等(模块间的协作)
self._fund_context_dict: dict[str, FundContext] = dict()
self._fund_waiting_dict: dict[str, List[PageType]] = dict()
# 相关模块
self._need_crawled_fund_module = need_crawled_fund_module
self._data_mining_module = data_mining_module
self._save_result_module = save_result_module
self._downloader = GetPageOnSubProcess(logging.root.level)
# 总共需要的步骤(当前一个基金只算一步)
self._total_step_count: Optional[int] = None
# 当前已经完成的
self._finished_step_count: Optional[int] = None
self._exit_sign: bool = False
def show_process(self) -> None:
"""
爬取进度提示
"""
logging.info("开始获取需要爬取的基金任务")
while not self._exit_sign and (self._total_step_count is None or self._finished_step_count is None):
# 等待任务开始
sleep(0.1)
logging.info("开始爬取基金数据")
with tqdm(total=self._total_step_count) as pbar:
last_finished_task_num = None
while not self._exit_sign and self._finished_step_count < self._total_step_count:
cur_finished_task_num = self._finished_step_count
pbar.update(cur_finished_task_num - (last_finished_task_num if last_finished_task_num else 0))
last_finished_task_num = cur_finished_task_num
sleep(1)
def run(self) -> None:
try:
# 独立的爬取进程(避免GIL)
self._downloader.start()
# 独立的进度展示线程
Thread(target=self.show_process).start()
# 爬取主流程
self.do_run()
except Exception as e:
logging.exception(f"报错啦,主进程完蛋啦 {e}")
finally:
self._exit_sign = True
self._downloader.close_downloader()
self._save_result_module.exit()
self._downloader.join_downloader()
logging.info('主进程退出')
def do_run(self) -> None:
# 获取任务
fund_context_list = self._need_crawled_fund_module.get_fund_list()
self._fund_context_dict = {fund.fund_code: fund for fund in fund_context_list}
self._total_step_count = len(fund_context_list)
self._finished_step_count = 0
while self._finished_step_count < self._total_step_count:
# http请求发起和解析
first_meet_fund_code = None
for fund_code in self._fund_context_dict.keys():
# 这里要注意req的顺序和context的遍历顺序,避免堆积大量处于中间状态的任务
# 寻找第一个waiting队列已经处理完毕的context
if fund_code in self._fund_waiting_dict and len(self._fund_waiting_dict[fund_code]) > 0:
continue
elif fund_code in self._fund_waiting_dict and len(self._fund_waiting_dict[fund_code]) == 0:
self._fund_waiting_dict.pop(fund_code)
first_meet_fund_code = fund_code
break
if first_meet_fund_code:
fund_context = self._fund_context_dict[first_meet_fund_code]
page_req_list = self._data_mining_module.summit_context(fund_context)
if page_req_list:
# 数据挖掘模块提出新爬取请求
for page_req in page_req_list:
self._downloader.apply(FundRequest(fund_context.fund_code, page_req[0], page_req[1]))
self._fund_waiting_dict[fund_context.fund_code] = [page_req[0] for page_req in page_req_list]
else:
# 没有新的爬取请求,保存爬取结果
self._fund_context_dict.pop(first_meet_fund_code)
self._finished_step_count += 1
self._save_result_module.save_result(fund_context)
# 处理http请求结果
counter = 0
while True:
counter += 1
# 请求队列太满时,优先等待和处理下结果
if counter > 1 and not self._downloader.if_downloader_busy():
break
try:
# 上一步处理了一圈,发现没有事情可以干的时候,可以block等待返回,避免忙等待
block = first_meet_fund_code is None or self._downloader.if_downloader_busy()
cur_res = self._downloader.get_result(block)
self._fund_waiting_dict[cur_res.fund_code].remove(cur_res.page_type)
self._fund_context_dict[cur_res.fund_code].http_response_dict[cur_res.page_type] = cur_res
except Empty:
pass
logging.info("爬取结束")
|
|