鱼C论坛

 找回密码
 立即注册
查看: 1821|回复: 0

[技术交流] 如何更简单的使用线程,来进行数据采集或者图片下载

[复制链接]
发表于 2019-10-14 17:55:55 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
简单的来说,提高代码的复用性吧。可能写的不是很好,有大神的话,欢迎指点交流。

这里说两个简单的Demo。在复用的时候,只要继承TaskThreading,继而重写parse函数,作为爬虫“入口”,即可进行多线程采集

一,只开多个线程。不进行任务类型区分。
from queue import Queue
import threading
import requests


class ThreadPoolManagement(object):
    """线程池管理"""

    def __init__(self, task_thread, task_queue, number=5):
        """
        :param task_thread: 任务线程
        :param task_queue: 任务队列
        :param number: 线程池数量
        """
        self.number = number
        self.thread_list = []
        self.__create_threading__(task_thread, task_queue)

    def __create_threading__(self, task_thread, task_queue):
        for i in range(self.number):
            self.thread_list.append(task_thread(task_queue))

    def start_threading(self) -> None:
        for i in self.thread_list:
            i.start()

    def join_threading(self) -> None:
        for i in self.thread_list:
            i.join()


class TaskThreading(threading.Thread):
    """任务线程"""

    def __init__(self, task_queue: Queue):
        super().__init__()
        self.task = task_queue

    def run(self) -> None:
        print("子线程(-%s-)启动" % (threading.current_thread().name))
        while True:
            if self.task.empty():
                break
            else:
                url = self.task.get()
                self.parse(url)
        print("子线程(-%s-)结束" % (threading.current_thread().name))

    def parse(self, url) -> None:
        pass

class TestMode(TaskThreading):

    def parse(self, url) -> None:
        """在这里重写你的爬虫代码"""
        response = requests.get(url=url)
        print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
        print(f"{threading.current_thread().name}响应成功,状态{response}")

def main():
    # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
    # current_thread()返回当前线程的实例
    print("主线程(-%s-)启动" % (threading.current_thread().name))

    import queue
    # 建好队列,给定爬虫任务
    get_url_queue = queue.Queue()
    for page in range(5):
        get_url_queue.put(f"http://www.httpbin.org")

    # 初始化线程池,数量为3
    threading_pool = ThreadPoolManagement(
        task_thread=TestMode,
        task_queue=get_url_queue,
        number=3
    )

    # 启动线程池的线程
    threading_pool.start_threading()

    # 检测第一个爬虫队列任务是否为空
    while not get_url_queue.empty():
        pass

    # 关闭第一个爬虫线程池的线程
    threading_pool.join_threading()

    print("主线程(-%s-)结束" % (threading.current_thread().name))


if __name__ == '__main__':
    main()

二,开多个线程。进行任务类型区分(生产消费者模型线程)。
# -*- coding: utf-8 -*-
# !/usr/bin/python3
"""
Created on 2019:10-13-->16:19:36

@author: 1263270345@qq.com / Alex

"""
from queue import Queue
import threading
import requests

stop_flag = False
data_queue = Queue()

class ThreadPoolManagement(object):
    """线程池管理"""

    def __init__(self, task_thread, task_queue, number=5):
        """
        :param task_thread: 任务线程
        :param task_queue: 任务队列
        :param number: 线程池数量
        """
        self.number = number
        self.thread_list = []
        self.__create_threading__(task_thread, task_queue)

    def __create_threading__(self, task_thread, task_queue):
        for i in range(self.number):
            self.thread_list.append(task_thread(task_queue))

    def start_threading(self) -> None:
        for i in self.thread_list:
            i.start()

    def join_threading(self) -> None:
        for i in self.thread_list:
            i.join()


class TaskThreading(threading.Thread):
    """任务线程"""

    def __init__(self, task_queue: Queue):
        super().__init__()
        self.task = task_queue

    def run(self) -> None:
        print("子线程(-%s-)启动" % (threading.current_thread().name))
        while True:
            if self.task.empty():
                break
            else:
                url = self.task.get()
                self.parse(url)
        print("子线程(-%s-)结束" % (threading.current_thread().name))

    def parse(self, url) -> None:
        pass


class TestMode(TaskThreading):

    def parse(self, url) -> None:
        """在这里重写你的爬虫代码"""
        response = requests.get(url=url)
        print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
        print(f"{threading.current_thread().name}响应成功,状态{response}")
        data_queue.put(url)


class Test2Mode(TaskThreading):

    def run(self) -> None:
        """
        如果不进行run重写,则会因为队列里面没有来的及添加任务,
        导致线程,开启即刻关闭,所以进行堵塞
        """
        while not stop_flag:
            try:
                url = self.task.get(False)
                if not url:
                    continue
                self.parse(url)
                self.task.task_done()
            except Exception as e:
                pass

    def parse(self, url) -> None:
        """在这里重写你的爬虫代码"""
        response = requests.get(url=url)
        print(f"{threading.current_thread().name}正在模拟等待{url}链接响应,时长5秒")
        print(f"{threading.current_thread().name}响应成功,状态{response}")


def main():
    # 任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
    # current_thread()返回当前线程的实例
    print("主线程(-%s-)启动" % (threading.current_thread().name))

    import queue
    # 建好队列,给定爬虫任务
    get_url_queue = queue.Queue()
    for page in range(5):
        get_url_queue.put(f"http://www.httpbin.org")

    # 初始化第一个爬虫,线程池数量为3
    threading_pool = ThreadPoolManagement(
        task_thread=TestMode,
        task_queue=get_url_queue,
        number=3
    )

    # 初始化第二个爬虫,线程池数量为3
    threading_poolz = ThreadPoolManagement(
        task_thread=Test2Mode,
        task_queue=data_queue,
        number=3
    )

    # 启动两个线程池的线程
    threading_pool.start_threading()
    threading_poolz.start_threading()

    # 检测第一个爬虫队列任务是否为空
    while not get_url_queue.empty():
        pass

    # 关闭第一个爬虫线程池的线程
    threading_pool.join_threading()

    # 检测第二个爬虫队列任是否为空
    while not data_queue.empty():
        pass

    # 关闭第二个线程池的线程
    global stop_flag
    stop_flag = True
    threading_poolz.join_threading()

    print("主线程(-%s-)结束" % (threading.current_thread().name))


if __name__ == '__main__':
    main()




本帖被以下淘专辑推荐:

想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-11-24 13:10

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表