koma610630 发表于 2023-5-11 16:06:38

关于多线程下载 暂停 跟恢复问题

写个一个多线程下载,
但是无法暂停跟恢复,

老哥们帮忙处理下这个问题。


import json
import os
import requests
import threading
import tkinter as tk
from tkinter import filedialog,messagebox
from concurrent.futures import ThreadPoolExecutor
import logging

# 创建日志目录
# os.makedirs('logs', exist_ok=True)
# logging.basicConfig(filename='logs/download.log', level=logging.ERROR)

# BASE_DIR = 'BASE_DIR'# 下载文件的基本目录,你需要根据需要进行修改

# 定义下载函数
def download_file(download_url, directory, lock, event, label, count):
   

    try:
      # 获取下载文件的文件名
      filename = os.path.basename(download_url)
      print('正在下载文件:', filename )
      base_url = url_entry.get()
      # 创建目录
      os.makedirs(directory, exist_ok=True)
      # 下载文件
      response = requests.get(base_url+"/webres/"+download_url, timeout=10)
      with lock:
            with open(os.path.join(directory, filename), 'wb') as f:
                f.write(response.content)
      print(f'文件 {download_url} 下载结束')
      
      label.set(f'正在下载文件:{filename}+下载文件数量:{count}')# 更新窗口中的标签
      


    except Exception as e:
      print(f'下载文件 {base_url+download_url} 出错: {e}')
      logging.error(f'下载文件 {download_url} 出错')
    finally:
      event.set()# 下载结束后设置 event

# 定义选择文件函数
def choose_file():
    file_path = filedialog.askopenfilename()
    file_path_var.set(file_path)
    # 读取本地json文件
    with open(file_path, 'r') as f:
      data = json.load(f)
    # 打印下载地址数量
    count_1 = len(data)
   

    print('下载地址数量:', len(data),count_1)

# 定义下载函数
def download(max_threads):
    # 检查用户是否已经选择了json文件
    if not file_path_var.get():
      messagebox.showinfo('提示', '请先选择json文件')
      return

    # 读取本地json文件


#创建日志目录
    os.makedirs(url_entry.get()+'/logs', exist_ok=True)
    logging.basicConfig(filename=url_entry.get()+'/logs/download.log', level=logging.ERROR)

    with open(file_path_var.get(), 'r') as f:
      data = json.load(f)
    # 创建锁对象
    lock = threading.Lock()
    # 创建线程池
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
      # 创建 event 对象
      event = threading.Event()
      event.set()# 初始状态为 True
      # 创建 tkinter.StringVar() 对象
      label_var = tk.StringVar()
      label_var.set('等待下载...')
      label = tk.Label(window, textvariable=label_var)
      label.pack()

      # 遍历下载地址并下载文件
      count = 0
      for download_url in data:
            # 获取下载文件的目录
            count += 1
            directory = os.path.join(url_entry.get(), *download_url.split('/'))
            # 提交任务到线程池
            executor.submit(download_file, download_url, directory, lock, event, label_var,count)
            # 检查 event 对象的状态,如果为 False,则暂停下载
            while not event.is_set():
                pass

# 定义暂停下载函数
def pause_download():
    event.clear()

# 定义恢复下载函数
def resume_download():
    event.set()

# 创建窗口
window = tk.Tk()
window.title('批量下载')
window.geometry('400x300')

# 创建选择文件按钮
file_path_var = tk.StringVar()
file_path_label = tk.Label(window, textvariable=file_path_var)
file_path_label.pack()
choose_file_button = tk.Button(window, text='选择文件', command=choose_file)
choose_file_button.pack()

# 创建线程数量标签和文本框
thread_label = tk.Label(window, text='线程数量:')
thread_label.pack()
thread_entry = tk.Entry(window)
thread_entry.insert(0, '10')
thread_entry.pack()

# 创建下载地址标签和文本框
url_label = tk.Label(window, text='下载地址:')
url_label.pack()

url_entry = tk.Entry(window)
url_entry.pack()
url_entry.insert(0, '6290')



# 创建下载按钮
download_button = tk.Button(window, text='下载', command=lambda: threading.Thread(target=download, args=(int(thread_entry.get()),)).start())
download_button.pack()

# 创建暂停按钮和恢复按钮
pause_button = tk.Button(window, text='暂停', command=pause_download)
pause_button.pack()
resume_button = tk.Button(window, text='恢复', command=resume_download)
resume_button.pack()
download_button.place(x=130, y=160)
pause_button.place(x=180, y=160)
resume_button.place(x=230, y=160)

# 运行窗口
window.mainloop()

isdkz 发表于 2023-5-11 16:07:03

本帖最后由 isdkz 于 2023-5-11 16:15 编辑

在你的代码中,暂停和恢复的实现有些问题。你的代码是使用 threading.Event() 来控制下载线程的暂停和恢复。你在定义 download_file 函数时将 event 作为参数传入,然后在循环下载时通过 event.is_set() 来判断是否应该暂停下载,但是 event.is_set() 返回 True 时,你的程序会一直下载,直到下载完所有的文件,即使 event 的状态被改变。

然而, requests.get() 并没有提供暂停和恢复的功能,所以这里我们需要使用一个更复杂的方法来实现这个功能。

我们需要对每个下载任务进行细分,将每个下载任务分为多个小块,然后在下载每个小块之前检查 event 的状态,如果 event.is_set() 返回 False ,那么我们就暂停下载。当 event.is_set() 返回 True 时,我们就恢复下载。这样我们就可以实现下载的暂停和恢复。

由于你的程序是使用GUI来控制下载,所以我们还需要把 event 对象作为全局变量,这样我们的暂停和恢复按钮就可以控制 event 的状态了。

我这里给出了一个简化的代码示例来说明这个方法:

import requests
import threading
import os
import time

# 创建 event 对象
event = threading.Event()
event.set()# 初始状态为 True

def download_file(url, filename):
    # 分块下载的大小
    chunk_size = 1024# 1 KB
    response = requests.get(url, stream=True)
    # 分块下载文件
    with open(filename, 'wb') as f:
      for chunk in response.iter_content(chunk_size):
            # 检查 event 对象的状态,如果为 False,则暂停下载
            while not event.is_set():
                time.sleep(1)
            f.write(chunk)

def pause_download():
    event.clear()

def resume_download():
    event.set()

# 开始下载
threading.Thread(target=download_file, args=('https://example.com/file.zip', 'file.zip')).start()

# 暂停下载
time.sleep(5)
pause_download()

# 恢复下载
time.sleep(5)
resume_download()


在这个示例中,我们首先创建了一个 event 对象,并且将其初始状态设置为 True 。然后我们定义了一个 download_file 函数来下载文件。在这个函数中,我们将文件分成1KB的块,然后在下载每一块之前都会检查 event 的状态。如果 event.is_set() 返回 False ,那么我们就暂停下载,直到 event.is_set() 返回 True 为止。

我们还定义了 pause_download 和 resume_download 函数来改变 event 的状态。

然后我们启动一个新线程来下载文件。在下载的过程中,我们可以调用 pause_download 和 resume_download 函数来暂停和恢复下载。

koma610630 发表于 2023-5-11 18:06:03

isdkz 发表于 2023-5-11 16:07
在你的代码中,暂停和恢复的实现有些问题。你的代码是使用 threading.Event() 来控制下载线程的暂停和恢复 ...

整个完整的代码呢。

sfqxx 发表于 2023-5-11 18:07:02

很抱歉,您提供的代码没有实现暂停和恢复功能,因此无法在其基础上进行修改。需要实现暂停和恢复功能,可以考虑在下载函数中添加一个变量来记录当前程序状态(下载/暂停),然后根据这个状态来判断是否下载文件并改变事件对象的状态。例如:

1. 在 `download` 函数中创建一个变量 `downloading` 来标记当前是否正在下载文件:


def download(max_threads):
    # 检查用户是否已经选择了json文件
    if not file_path_var.get():
      messagebox.showinfo('提示', '请先选择json文件')
      return

    # 读取本地json文件


    os.makedirs(url_entry.get()+'/logs', exist_ok=True)
    logging.basicConfig(filename=url_entry.get()+'/logs/download.log', level=logging.ERROR)

    with open(file_path_var.get(), 'r') as f:
      data = json.load(f)
      
    lock = threading.Lock()
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
      event = threading.Event()
      event.set()

-->   downloading = True            # 新增:用于表示是否在下载文件
      label_var = tk.StringVar()
      label_var.set('等待下载...')
      label = tk.Label(window, textvariable=label_var)
      label.pack()

      count = 0
      for download_url in data:
            directory = os.path.join(url_entry.get(), *download_url.split('/'))
            count += 1


    -->      if downloading:      # 新增:判断是否处于下载状态
                executor.submit(download_file, download_url, directory, lock, event, label_var,count)
                while not event.is_set():
                  if not downloading:# 新增:暂停状态时,将事件等待
                        event.wait()
    -->      else:
                while not downloading:   # 新增:等待下载状态启动
                  event.wait()

      label_var.set('下载完成')

2. 在 `pause_download` 函数中,将 `downloading` 的值设为 False,并将事件对象清除:


def pause_download():
    global downloading

    downloading =False    # 新增:将downloading的值设为False
    event.clear()    # 新增:清除事件对象
    messagebox.showinfo('提示', '下载已暂停')

3. 在 `resume_download` 函数中,将 `downloading` 的值设为 True,然后设置事件对象:


def resume_download():
    global downloading
   
    if not downloading:
      downloading = True    # 新增:将downloading的值设为True
      event.set()    # 新增:设置事件对象
      messagebox.showinfo('提示', '下载已恢复')
这样实现了控制下载过程的暂停和恢复功能,当点击暂停按钮时,程序会停止向线程池添加新的任务,并且将当前正在执行的任务运行完即可退出。如果希望立即停止所有运行任务,则需要在循环等待事件时添加超时检查,并且将下载函数的子线程改为可以被中断的方式运行。

koma610630 发表于 2023-5-11 18:32:28

sfqxx 发表于 2023-5-11 18:07
很抱歉,您提供的代码没有实现暂停和恢复功能,因此无法在其基础上进行修改。需要实现暂停和恢复功能,可以 ...

"event" is not definedPylance
..报错

sfqxx 发表于 2023-5-11 18:42:38

koma610630 发表于 2023-5-11 18:32
"event" is not definedPylance
..报错

import logging
import threading
from concurrent.futures import ThreadPoolExecutor
import json
import os
import tkinter as tk
from tkinter import filedialog, messagebox

def download(max_threads):
    # 检查用户是否已经选择了json文件
    if not file_path_var.get():
      messagebox.showinfo('提示', '请先选择json文件')
      return

    # 读取本地json文件


    os.makedirs(url_entry.get()+'/logs', exist_ok=True)
    logging.basicConfig(filename=url_entry.get()+'/logs/download.log', level=logging.ERROR)

    with open(file_path_var. get(), 'r') as f:
      data = json.load(f)

    lock = threading.Lock()
   
    # 创建事件对象
    event = threading.Event()
    event.set()

    # 定义下载标志
    downloading = True

    with ThreadPoolExecutor(max_workers=max_threads) as executor:
      
      label_var = tk.StringVar()
      label_var.set('等待下载...')
      label = tk.Label(window, textvariable=label_var)
      label.pack()

      count = 0
      for download_url in data:
            directory = os.path.join(url_entry.get(), *download_url.split('/'))
            count += 1

            if downloading:   
                executor.submit(download_file, download_url, directory, lock, event, label_var,count)
               
                # 在等待时,检查下载是否被暂停
                while not event.is_set():
                  if not downloading:
                        event.wait()
                        
            else:
                while not downloading:   
                  event.wait()

      label_var.set('下载完成')
                  
# 定义暂停和恢复函数      
def pause_download():
    global downloading, event

    downloading =False   
    event.clear()   
    messagebox.showinfo('提示', '下载已暂停')
   
def resume_download():
    global downloading, event
   
    if not downloading:
      downloading = True   
      event.set()import logging
import threading
from concurrent.futures import ThreadPoolExecutor
import json
import os
import tkinter as tk
from tkinter import filedialog, messagebox

def download(max_threads):
    # 检查用户是否已经选择了json文件
    if not file_path_var.get():
      messagebox.showinfo('提示', '请先选择json文件')
      return

    # 读取本地json文件


    os.makedirs(url_entry.get()+'/logs', exist_ok=True)
    logging.basicConfig(filename=url_entry.get()+'/logs/download.log', level=logging.ERROR)

    with open(file_path_var. get(), 'r') as f:
      data = json.load(f)

    lock = threading.Lock()
   
    # 创建事件对象
    event = threading.Event()
    event.set()

    # 定义下载标志
    downloading = True

    with ThreadPoolExecutor(max_workers=max_threads) as executor:
      
      label_var = tk.StringVar()
      label_var.set('等待下载...')
      label = tk.Label(window, textvariable=label_var)
      label.pack()

      count = 0
      for download_url in data:
            directory = os.path.join(url_entry.get(), *download_url.split('/'))
            count += 1

            if downloading:   
                executor.submit(download_file, download_url, directory, lock, event, label_var,count)
               
                # 在等待时,检查下载是否被暂停
                while not event.is_set():
                  if not downloading:
                        event.wait()
                        
            else:
                while not downloading:   
                  event.wait()

      label_var.set('下载完成')
                  
# 定义暂停和恢复函数      
def pause_download():
    global downloading, event

    downloading =False   
    event.clear()   
    messagebox.showinfo('提示', '下载已暂停')
   
def resume_download():
    global downloading, event
   
    if not downloading:
      downloading = True   
      event.set()
这样试试?{:10_330:}

koma610630 发表于 2023-5-11 19:02:51

sfqxx 发表于 2023-5-11 18:42
这样试试?

多谢了,我去看看
页: [1]
查看完整版本: 关于多线程下载 暂停 跟恢复问题