|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
简单的来说,提高代码的复用性吧。可能写的不是很好,有大神的话,欢迎指点交流。
这里说两个简单的Demo。在复用的时候,只要继承TaskThreading,继而重写parse函数,作为爬虫“入口”,即可进行多线程采集
一,只开多个线程。不进行任务类型区分。
- from queue import Queue
- import threading
- import requests
- class ThreadPoolManagement(object):
- """线程池管理"""
- def __init__(self, task_thread, task_queue, number=5):
- """
- :param task_thread: 任务线程
- :param task_queue: 任务队列
- :param number: 线程池数量
- """
- self.number = number
- self.thread_list = []
- self.__create_threading__(task_thread, task_queue)
- def __create_threading__(self, task_thread, task_queue):
- for i in range(self.number):
- self.thread_list.append(task_thread(task_queue))
- def start_threading(self) -> None:
- for i in self.thread_list:
- i.start()
- def join_threading(self) -> None:
- for i in self.thread_list:
- i.join()
- class TaskThreading(threading.Thread):
- """任务线程"""
- def __init__(self, task_queue: Queue):
- super().__init__()
- self.task = task_queue
- def run(self) -> None:
- print("子线程(-%s-)启动" % (threading.current_thread().name))
- while True:
- if self.task.empty():
- break
- else:
- url = self.task.get()
- self.parse(url)
- print("子线程(-%s-)结束" % (threading.current_thread().name))
- def parse(self, url) -> None:
- pass
- class TestMode(TaskThreading):
- def parse(self, url) -> None:
- """在这里重写你的爬虫代码"""
- response = requests.get(url=url)
- print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
- print(f"{threading.current_thread().name}响应成功,状态{response}")
- def main():
- # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
- # current_thread()返回当前线程的实例
- print("主线程(-%s-)启动" % (threading.current_thread().name))
- import queue
- # 建好队列,给定爬虫任务
- get_url_queue = queue.Queue()
- for page in range(5):
- get_url_queue.put(f"http://www.httpbin.org")
- # 初始化线程池,数量为3
- threading_pool = ThreadPoolManagement(
- task_thread=TestMode,
- task_queue=get_url_queue,
- number=3
- )
- # 启动线程池的线程
- threading_pool.start_threading()
- # 检测第一个爬虫队列任务是否为空
- while not get_url_queue.empty():
- pass
- # 关闭第一个爬虫线程池的线程
- threading_pool.join_threading()
- print("主线程(-%s-)结束" % (threading.current_thread().name))
- if __name__ == '__main__':
- main()
复制代码
二,开多个线程。进行任务类型区分(生产消费者模型线程)。
- # -*- coding: utf-8 -*-
- # !/usr/bin/python3
- """
- Created on 2019:10-13-->16:19:36
- @author: 1263270345@qq.com / Alex
- """
- from queue import Queue
- import threading
- import requests
- stop_flag = False
- data_queue = Queue()
- class ThreadPoolManagement(object):
- """线程池管理"""
- def __init__(self, task_thread, task_queue, number=5):
- """
- :param task_thread: 任务线程
- :param task_queue: 任务队列
- :param number: 线程池数量
- """
- self.number = number
- self.thread_list = []
- self.__create_threading__(task_thread, task_queue)
- def __create_threading__(self, task_thread, task_queue):
- for i in range(self.number):
- self.thread_list.append(task_thread(task_queue))
- def start_threading(self) -> None:
- for i in self.thread_list:
- i.start()
- def join_threading(self) -> None:
- for i in self.thread_list:
- i.join()
- class TaskThreading(threading.Thread):
- """任务线程"""
- def __init__(self, task_queue: Queue):
- super().__init__()
- self.task = task_queue
- def run(self) -> None:
- print("子线程(-%s-)启动" % (threading.current_thread().name))
- while True:
- if self.task.empty():
- break
- else:
- url = self.task.get()
- self.parse(url)
- print("子线程(-%s-)结束" % (threading.current_thread().name))
- def parse(self, url) -> None:
- pass
- class TestMode(TaskThreading):
- def parse(self, url) -> None:
- """在这里重写你的爬虫代码"""
- response = requests.get(url=url)
- print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
- print(f"{threading.current_thread().name}响应成功,状态{response}")
- data_queue.put(url)
- class Test2Mode(TaskThreading):
- def run(self) -> None:
- """
- 如果不进行run重写,则会因为队列里面没有来的及添加任务,
- 导致线程,开启即刻关闭,所以进行堵塞
- """
- while not stop_flag:
- try:
- url = self.task.get(False)
- if not url:
- continue
- self.parse(url)
- self.task.task_done()
- except Exception as e:
- pass
- def parse(self, url) -> None:
- """在这里重写你的爬虫代码"""
- response = requests.get(url=url)
- print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
- print(f"{threading.current_thread().name}响应成功,状态{response}")
- def main():
- # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
- # current_thread()返回当前线程的实例
- print("主线程(-%s-)启动" % (threading.current_thread().name))
- import queue
- # 建好队列,给定爬虫任务
- get_url_queue = queue.Queue()
- for page in range(5):
- get_url_queue.put(f"http://www.httpbin.org")
- # 初始化第一个爬虫,线程池数量为3
- threading_pool = ThreadPoolManagement(
- task_thread=TestMode,
- task_queue=get_url_queue,
- number=3
- )
- # 初始化第二个爬虫,线程池数量为3
- threading_poolz = ThreadPoolManagement(
- task_thread=Test2Mode,
- task_queue=data_queue,
- number=3
- )
- # 启动两个线程池的线程
- threading_pool.start_threading()
- threading_poolz.start_threading()
- # 检测第一个爬虫队列任务是否为空
- while not get_url_queue.empty():
- pass
- # 关闭第一个爬虫线程池的线程
- threading_pool.join_threading()
- # 检测第二个爬虫队列任是否为空
- while not data_queue.empty():
- pass
- # 关闭第二个线程池的线程
- global stop_flag
- stop_flag = True
- threading_poolz.join_threading()
- print("主线程(-%s-)结束" % (threading.current_thread().name))
- if __name__ == '__main__':
- main()
复制代码
|
|