马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
简单的来说,提高代码的复用性吧。可能写的不是很好,有大神的话,欢迎指点交流。
这里说两个简单的Demo。在复用的时候,只要继承TaskThreading,继而重写parse函数,作为爬虫“入口”,即可进行多线程采集
一,只开多个线程。不进行任务类型区分。
from queue import Queue
import threading
import requests
class ThreadPoolManagement(object):
"""线程池管理"""
def __init__(self, task_thread, task_queue, number=5):
"""
:param task_thread: 任务线程
:param task_queue: 任务队列
:param number: 线程池数量
"""
self.number = number
self.thread_list = []
self.__create_threading__(task_thread, task_queue)
def __create_threading__(self, task_thread, task_queue):
for i in range(self.number):
self.thread_list.append(task_thread(task_queue))
def start_threading(self) -> None:
for i in self.thread_list:
i.start()
def join_threading(self) -> None:
for i in self.thread_list:
i.join()
class TaskThreading(threading.Thread):
"""任务线程"""
def __init__(self, task_queue: Queue):
super().__init__()
self.task = task_queue
def run(self) -> None:
print("子线程(-%s-)启动" % (threading.current_thread().name))
while True:
if self.task.empty():
break
else:
url = self.task.get()
self.parse(url)
print("子线程(-%s-)结束" % (threading.current_thread().name))
def parse(self, url) -> None:
pass
class TestMode(TaskThreading):
def parse(self, url) -> None:
"""在这里重写你的爬虫代码"""
response = requests.get(url=url)
print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
print(f"{threading.current_thread().name}响应成功,状态{response}")
def main():
# 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
# current_thread()返回当前线程的实例
print("主线程(-%s-)启动" % (threading.current_thread().name))
import queue
# 建好队列,给定爬虫任务
get_url_queue = queue.Queue()
for page in range(5):
get_url_queue.put(f"http://www.httpbin.org")
# 初始化线程池,数量为3
threading_pool = ThreadPoolManagement(
task_thread=TestMode,
task_queue=get_url_queue,
number=3
)
# 启动线程池的线程
threading_pool.start_threading()
# 检测第一个爬虫队列任务是否为空
while not get_url_queue.empty():
pass
# 关闭第一个爬虫线程池的线程
threading_pool.join_threading()
print("主线程(-%s-)结束" % (threading.current_thread().name))
if __name__ == '__main__':
main()
二,开多个线程。进行任务类型区分(生产消费者模型线程)。# -*- coding: utf-8 -*-
# !/usr/bin/python3
"""
Created on 2019:10-13-->16:19:36
@author: 1263270345@qq.com / Alex
"""
from queue import Queue
import threading
import requests
stop_flag = False
data_queue = Queue()
class ThreadPoolManagement(object):
"""线程池管理"""
def __init__(self, task_thread, task_queue, number=5):
"""
:param task_thread: 任务线程
:param task_queue: 任务队列
:param number: 线程池数量
"""
self.number = number
self.thread_list = []
self.__create_threading__(task_thread, task_queue)
def __create_threading__(self, task_thread, task_queue):
for i in range(self.number):
self.thread_list.append(task_thread(task_queue))
def start_threading(self) -> None:
for i in self.thread_list:
i.start()
def join_threading(self) -> None:
for i in self.thread_list:
i.join()
class TaskThreading(threading.Thread):
"""任务线程"""
def __init__(self, task_queue: Queue):
super().__init__()
self.task = task_queue
def run(self) -> None:
print("子线程(-%s-)启动" % (threading.current_thread().name))
while True:
if self.task.empty():
break
else:
url = self.task.get()
self.parse(url)
print("子线程(-%s-)结束" % (threading.current_thread().name))
def parse(self, url) -> None:
pass
class TestMode(TaskThreading):
def parse(self, url) -> None:
"""在这里重写你的爬虫代码"""
response = requests.get(url=url)
print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
print(f"{threading.current_thread().name}响应成功,状态{response}")
data_queue.put(url)
class Test2Mode(TaskThreading):
def run(self) -> None:
"""
如果不进行run重写,则会因为队列里面没有来的及添加任务,
导致线程,开启即刻关闭,所以进行堵塞
"""
while not stop_flag:
try:
url = self.task.get(False)
if not url:
continue
self.parse(url)
self.task.task_done()
except Exception as e:
pass
def parse(self, url) -> None:
"""在这里重写你的爬虫代码"""
response = requests.get(url=url)
print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
print(f"{threading.current_thread().name}响应成功,状态{response}")
def main():
# 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
# current_thread()返回当前线程的实例
print("主线程(-%s-)启动" % (threading.current_thread().name))
import queue
# 建好队列,给定爬虫任务
get_url_queue = queue.Queue()
for page in range(5):
get_url_queue.put(f"http://www.httpbin.org")
# 初始化第一个爬虫,线程池数量为3
threading_pool = ThreadPoolManagement(
task_thread=TestMode,
task_queue=get_url_queue,
number=3
)
# 初始化第二个爬虫,线程池数量为3
threading_poolz = ThreadPoolManagement(
task_thread=Test2Mode,
task_queue=data_queue,
number=3
)
# 启动两个线程池的线程
threading_pool.start_threading()
threading_poolz.start_threading()
# 检测第一个爬虫队列任务是否为空
while not get_url_queue.empty():
pass
# 关闭第一个爬虫线程池的线程
threading_pool.join_threading()
# 检测第二个爬虫队列任是否为空
while not data_queue.empty():
pass
# 关闭第二个线程池的线程
global stop_flag
stop_flag = True
threading_poolz.join_threading()
print("主线程(-%s-)结束" % (threading.current_thread().name))
if __name__ == '__main__':
main()
|