免费代理池的实现
github地址: https://github.com/7134g/myproxypool核心:以多线程方式去获取免费的ip站点,存储在内存中
程序分为三部分
1.测试代理线程
2.获取代理线程
3.提供api服务的线程
其中每一个请求将生成一个子线程去处理(获取代理和测试代理)
流程:启动调度器 -> 开启测试,爬取,服务三条主线程
爬取线程 -> 遍历爬取函数(每一个爬取函数对应一个免费代理平台) -> 清洗提取 -> 添加到队列A中
测试线程 -> 从队列A中获取ip -> 对设定的站点携带ip访问 -> 无效删除
服务线程 -> 提供api接口获取队列A中的代理
下列为调度器的启动函数
class Scheduler: # 该程序的调度器
def run(self):
factory = Factory() # 用与构建管理所有线程
# factory.set_active_count()
try:
if TESTER_ENABLED:
factory.add(schedule_tester)
if GETTER_ENABLED:
factory.add(schedule_getter)
if API_ENABLED:
factory.add(schedule_api)
factory.start() # 启动所有线程,也就是上面所添加的三个线程
finally:
factory.stop()
factory.shutdown()
Factory类提供了暂停,停止,等待任务执行完毕的接口,便于控制一批批的测试代理ip。self._create_worker()用于创建工人
class Factory:
...
def start(self):
Log.debug("线程池启动,招聘打工人")
self._create_worker()
Log.debug("打工人准备就绪")
self._working()
while self.factory_status:
time.sleep(CHECK_FACTORY_STATUS)
为了减少创建线程,就有了Worker类,其作用就是一个对象意味着一个线程
可由 Factory类 动态控制线程此时的工作
将执行过程中生成的所有Task类任务存放在cls._instance.tasks中,由空闲工人去拉来执行
class Worker(threading.Thread):
...
def run(self):
self.wf = "doing"
while self.factory.factory_status and self.status:
if not self.__running.is_set():
Log.info("咋瓦鲁多")
# 获取任务
t = self.factory.get_task()
if not t:
self._sleep(self.wid)
continue
try:
self.sleep_count = 0
t.run()
# print(f"打工人{self.wid}号 ,剩余工作 {self.factory._active_working}")
except Exception as e:
Log.error(f"打工人工作异常: {e}")
finally:
self.factory.minus_active(t.name)
self.__running.wait()# 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回
Log.debug(f"打工人{self.wid}你被炒鱿鱼了")
self.wf = "fired"
class Task:
...
def run(self):
if self.args:
if self.kw:
self.fun(*self.args, **self.kw)
else:
self.fun(*self.args)
else:
self.fun()
# 需要工作的内容
页:
[1]