鱼C论坛

 找回密码
 立即注册
查看: 2065|回复: 0

[技术交流] 如何更简单的使用线程,来进行数据采集或者图片下载

[复制链接]
发表于 2019-10-14 17:55:55 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
简单的来说,提高代码的复用性吧。可能写的不是很好,有大神的话,欢迎指点交流。

这里说两个简单的Demo。在复用的时候,只要继承TaskThreading,继而重写parse函数,作为爬虫“入口”,即可进行多线程采集

一,只开多个线程。不进行任务类型区分。

  1. from queue import Queue
  2. import threading
  3. import requests


  4. class ThreadPoolManagement(object):
  5.     """线程池管理"""

  6.     def __init__(self, task_thread, task_queue, number=5):
  7.         """
  8.         :param task_thread: 任务线程
  9.         :param task_queue: 任务队列
  10.         :param number: 线程池数量
  11.         """
  12.         self.number = number
  13.         self.thread_list = []
  14.         self.__create_threading__(task_thread, task_queue)

  15.     def __create_threading__(self, task_thread, task_queue):
  16.         for i in range(self.number):
  17.             self.thread_list.append(task_thread(task_queue))

  18.     def start_threading(self) -> None:
  19.         for i in self.thread_list:
  20.             i.start()

  21.     def join_threading(self) -> None:
  22.         for i in self.thread_list:
  23.             i.join()


  24. class TaskThreading(threading.Thread):
  25.     """任务线程"""

  26.     def __init__(self, task_queue: Queue):
  27.         super().__init__()
  28.         self.task = task_queue

  29.     def run(self) -> None:
  30.         print("子线程(-%s-)启动" % (threading.current_thread().name))
  31.         while True:
  32.             if self.task.empty():
  33.                 break
  34.             else:
  35.                 url = self.task.get()
  36.                 self.parse(url)
  37.         print("子线程(-%s-)结束" % (threading.current_thread().name))

  38.     def parse(self, url) -> None:
  39.         pass

  40. class TestMode(TaskThreading):

  41.     def parse(self, url) -> None:
  42.         """在这里重写你的爬虫代码"""
  43.         response = requests.get(url=url)
  44.         print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
  45.         print(f"{threading.current_thread().name}响应成功,状态{response}")

  46. def main():
  47.     # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
  48.     # current_thread()返回当前线程的实例
  49.     print("主线程(-%s-)启动" % (threading.current_thread().name))

  50.     import queue
  51.     # 建好队列,给定爬虫任务
  52.     get_url_queue = queue.Queue()
  53.     for page in range(5):
  54.         get_url_queue.put(f"http://www.httpbin.org")

  55.     # 初始化线程池,数量为3
  56.     threading_pool = ThreadPoolManagement(
  57.         task_thread=TestMode,
  58.         task_queue=get_url_queue,
  59.         number=3
  60.     )

  61.     # 启动线程池的线程
  62.     threading_pool.start_threading()

  63.     # 检测第一个爬虫队列任务是否为空
  64.     while not get_url_queue.empty():
  65.         pass

  66.     # 关闭第一个爬虫线程池的线程
  67.     threading_pool.join_threading()

  68.     print("主线程(-%s-)结束" % (threading.current_thread().name))


  69. if __name__ == '__main__':
  70.     main()
复制代码


二,开多个线程。进行任务类型区分(生产消费者模型线程)。
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/python3
  3. """
  4. Created on 2019:10-13-->16:19:36

  5. @author: 1263270345@qq.com / Alex

  6. """
  7. from queue import Queue
  8. import threading
  9. import requests

  10. stop_flag = False
  11. data_queue = Queue()

  12. class ThreadPoolManagement(object):
  13.     """线程池管理"""

  14.     def __init__(self, task_thread, task_queue, number=5):
  15.         """
  16.         :param task_thread: 任务线程
  17.         :param task_queue: 任务队列
  18.         :param number: 线程池数量
  19.         """
  20.         self.number = number
  21.         self.thread_list = []
  22.         self.__create_threading__(task_thread, task_queue)

  23.     def __create_threading__(self, task_thread, task_queue):
  24.         for i in range(self.number):
  25.             self.thread_list.append(task_thread(task_queue))

  26.     def start_threading(self) -> None:
  27.         for i in self.thread_list:
  28.             i.start()

  29.     def join_threading(self) -> None:
  30.         for i in self.thread_list:
  31.             i.join()


  32. class TaskThreading(threading.Thread):
  33.     """任务线程"""

  34.     def __init__(self, task_queue: Queue):
  35.         super().__init__()
  36.         self.task = task_queue

  37.     def run(self) -> None:
  38.         print("子线程(-%s-)启动" % (threading.current_thread().name))
  39.         while True:
  40.             if self.task.empty():
  41.                 break
  42.             else:
  43.                 url = self.task.get()
  44.                 self.parse(url)
  45.         print("子线程(-%s-)结束" % (threading.current_thread().name))

  46.     def parse(self, url) -> None:
  47.         pass


  48. class TestMode(TaskThreading):

  49.     def parse(self, url) -> None:
  50.         """在这里重写你的爬虫代码"""
  51.         response = requests.get(url=url)
  52.         print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
  53.         print(f"{threading.current_thread().name}响应成功,状态{response}")
  54.         data_queue.put(url)


  55. class Test2Mode(TaskThreading):

  56.     def run(self) -> None:
  57.         """
  58.         如果不进行run重写,则会因为队列里面没有来的及添加任务,
  59.         导致线程,开启即刻关闭,所以进行堵塞
  60.         """
  61.         while not stop_flag:
  62.             try:
  63.                 url = self.task.get(False)
  64.                 if not url:
  65.                     continue
  66.                 self.parse(url)
  67.                 self.task.task_done()
  68.             except Exception as e:
  69.                 pass

  70.     def parse(self, url) -> None:
  71.         """在这里重写你的爬虫代码"""
  72.         response = requests.get(url=url)
  73.         print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
  74.         print(f"{threading.current_thread().name}响应成功,状态{response}")


  75. def main():
  76.     # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
  77.     # current_thread()返回当前线程的实例
  78.     print("主线程(-%s-)启动" % (threading.current_thread().name))

  79.     import queue
  80.     # 建好队列,给定爬虫任务
  81.     get_url_queue = queue.Queue()
  82.     for page in range(5):
  83.         get_url_queue.put(f"http://www.httpbin.org")

  84.     # 初始化第一个爬虫,线程池数量为3
  85.     threading_pool = ThreadPoolManagement(
  86.         task_thread=TestMode,
  87.         task_queue=get_url_queue,
  88.         number=3
  89.     )

  90.     # 初始化第二个爬虫,线程池数量为3
  91.     threading_poolz = ThreadPoolManagement(
  92.         task_thread=Test2Mode,
  93.         task_queue=data_queue,
  94.         number=3
  95.     )

  96.     # 启动两个线程池的线程
  97.     threading_pool.start_threading()
  98.     threading_poolz.start_threading()

  99.     # 检测第一个爬虫队列任务是否为空
  100.     while not get_url_queue.empty():
  101.         pass

  102.     # 关闭第一个爬虫线程池的线程
  103.     threading_pool.join_threading()

  104.     # 检测第二个爬虫队列任是否为空
  105.     while not data_queue.empty():
  106.         pass

  107.     # 关闭第二个线程池的线程
  108.     global stop_flag
  109.     stop_flag = True
  110.     threading_poolz.join_threading()

  111.     print("主线程(-%s-)结束" % (threading.current_thread().name))


  112. if __name__ == '__main__':
  113.     main()
复制代码





本帖被以下淘专辑推荐:

小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-5-26 06:08

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表