鱼C论坛

 找回密码
 立即注册
查看: 727|回复: 4

[已解决]多线程存储文件导致文件缺失问题

[复制链接]
发表于 2023-9-23 20:42:42 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
import json
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from urllib.parse import urljoin
import jieba
from spellchecker import SpellChecker
import jieba.analyse
from bs4 import BeautifulSoup

lock = threading.Lock()
queue = Queue()

def load_stopwords(file_path):
    with open(file_path, 'r', encoding='utf-8-sig') as file:
        stop_words = [line.strip() for line in file]
    return stop_words


stop_words = load_stopwords('A:/搜索引擎系统/停用词表.txt')


def get_all_hrefs(text: str, current_url):
    all_href = []
    # 使用BeautifulSoup来解析网页源码
    soup = BeautifulSoup(text, 'lxml')
    # 获取所有a标签下的所有href属性
    all_a_tags = soup.find_all('a')
    for a_tag in all_a_tags:
        href = a_tag.get('href')
        if href and not href.startswith('#') and not href.startswith('javascript:'):
            absolute_url = urljoin(current_url, href)
            # 将https替换为http避免重复
            new_url = absolute_url.replace("https", "http")
            all_href.append(new_url)
    # 将得到的链接返回
    return all_href



def is_chinese(word):
    for char in word:
        if '\u4e00' <= char <= '\u9fff':  # 中文字符的Unicode范围
            return True
    return False


def remove_gibberish_words(words):
    spell = SpellChecker()
    filtered_words = []
    for word in words:
        if not is_chinese(word):  # 检查单词是否为中文
            if spell.known([word]):  # 对于非中文单词进行拼写检查
                filtered_words.append(word)
        else:
            filtered_words.append(word)
    return filtered_words


def Separate_words(content):
    # 使用正则表达式的 sub 方法将标点符号替换为空格
    content = re.sub("[^\w\s]+", "", content)
    content = list(jieba.cut(content, cut_all=False, use_paddle=True))
    content = remove_gibberish_words(content)
    content = [element for element in content if element.strip() != '']
    content = [word for word in content if word.lower() not in stop_words]

    return content


def to_text(num, current_url, title, content, links):
    with lock:
        data = {
            "id": num,
            "url": current_url,
            "title": title,
            "content": content,
            "links": links
        }
        with open(f'A:/搜索引擎系统/Test/web{page}.txt', 'w', encoding='utf-8-sig') as file:
            json.dump(data, file, ensure_ascii=False)


def get_title_and_content(current_url, html):
    soup = BeautifulSoup(html, 'html.parser')

    title = []
    if soup.get('title') != None:
        title = soup.find('title').get_text()

    content = soup.get_text()
    content = content.replace('\n', '')

    links = get_all_hrefs(html, current_url)
    return title, content, links


def do_work(page):
    try:
        with open(f'A:/搜索引擎系统/all_information/web{page}.txt', 'r', encoding='utf-8-sig') as file:

            text = file.read()
            num = text.split('\n')[0]
            current_url = text.split('\n')[1]
            html = str(text.split('\n')[2:])

            title, content, links = get_title_and_content(current_url, html)
            content = Separate_words(content)
            to_text(num, current_url, title, content, links)
            print(num)


    except FileNotFoundError:
        # 文件不存在时的处理
        print("文件不存在,请检查文件路径!")


if __name__ == '__main__':

    page = 1
    target = 100
    for num in range(1, target + 1):
        queue.put(num)

    start_time = time.time()

    with ThreadPoolExecutor(36) as pool:
        while not queue.empty():
            page = queue.get()
            pool.submit(do_work, page)

            time.sleep(0.1)
    pool.shutdown(cancel_futures=True)


    end_time = time.time()
    print(end_time - start_time)




我使用多线程处理得到的html数据并存储,遇到了文件缺失的问题
最佳答案
2023-9-23 20:43:12
通过阅读您提供的代码,我注意到在多线程处理的过程中,您使用了一个共享的队列 queue 来存储待处理的任务,并且每个线程从队列中获取任务并进行处理。然而,在您的代码中,并没有对文件操作进行加锁,这可能导致多个线程同时访问同一个文件,从而导致文件缺失问题。

为了解决这个问题,您可以考虑使用文件锁来确保同一时间只有一个线程可以访问文件。在 to_text 函数中,您可以将文件写入操作放在一个临界区域内,然后使用 lock 对象对该临界区域进行加锁。这样可以确保每次只有一个线程进行文件写入操作,避免多个线程同时写入同一个文件。

下面是修改后的代码示例:

  1. import json
  2. import re
  3. import threading
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor
  6. from queue import Queue
  7. from urllib.parse import urljoin
  8. import jieba
  9. from spellchecker import SpellChecker
  10. import jieba.analyse
  11. from bs4 import BeautifulSoup

  12. lock = threading.Lock()
  13. queue = Queue()

  14. def load_stopwords(file_path):
  15.     with open(file_path, 'r', encoding='utf-8-sig') as file:
  16.         stop_words = [line.strip() for line in file]
  17.     return stop_words

  18. # 其他函数定义...

  19. def to_text(num, current_url, title, content, links):
  20.     with lock:
  21.         data = {"id": num, "url": current_url, "title": title, "content": content, "links": links}
  22.         with open(f'A:/搜索引擎系统/Test/web{num}.txt', 'w', encoding='utf-8-sig') as file:
  23.             json.dump(data, file, ensure_ascii=False)

  24. # 其他函数定义...

  25. def do_work(page):
  26.     try:
  27.         with open(f'A:/搜索引擎系统/all_information/web{page}.txt', 'r', encoding='utf-8-sig') as file:
  28.             text = file.read()
  29.             num = text.split('\n')[0]
  30.             current_url = text.split('\n')[1]
  31.             html = str(text.split('\n')[2:])
  32.             title, content, links = get_title_and_content(current_url, html)
  33.             content = Separate_words(content)
  34.             to_text(num, current_url, title, content, links)
  35.             print(num)
  36.     except FileNotFoundError:
  37.         print("文件不存在,请检查文件路径!")

  38. if __name__ == '__main__':
  39.     # 设置线程数
  40.     num_threads = 36
  41.     page = 1
  42.     target = 100

  43.     for num in range(1, target + 1):
  44.         queue.put(num)

  45.     start_time = time.time()

  46.     with ThreadPoolExecutor(num_threads) as pool:
  47.         while not queue.empty():
  48.             page = queue.get()
  49.             pool.submit(do_work, page)
  50.             time.sleep(0.1)

  51.     pool.shutdown(cancel_futures=True)

  52.     end_time = time.time()
  53.     print(end_time - start_time)
复制代码


在修改后的代码中,我对 to_text 函数进行了修改,在写入文件之前使用 lock 进行加锁,确保每次只有一个线程能够执行写入操作。

希望这些修改能够解决您遇到的文件缺失问题。如果问题仍然存在,请提供更多详细信息,以便我能够给出进一步的建议。
如果问题已经解决,请设置最佳答案
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

发表于 2023-9-23 20:42:48 | 显示全部楼层
可能是由于多个线程同时访问同一个文件,导致文件被覆盖或者同时读取的问题。可以使用文件锁来解决这个问题。在写文件之前,使用 with lock: 语句获取锁,确保同一时刻只有一个线程在访问文件。另外,建议使用 with open 语句来打开文件,可以自动关闭文件,避免文件被占用的情况。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2023-9-23 20:43:12 | 显示全部楼层    本楼为最佳答案   
通过阅读您提供的代码,我注意到在多线程处理的过程中,您使用了一个共享的队列 queue 来存储待处理的任务,并且每个线程从队列中获取任务并进行处理。然而,在您的代码中,并没有对文件操作进行加锁,这可能导致多个线程同时访问同一个文件,从而导致文件缺失问题。

为了解决这个问题,您可以考虑使用文件锁来确保同一时间只有一个线程可以访问文件。在 to_text 函数中,您可以将文件写入操作放在一个临界区域内,然后使用 lock 对象对该临界区域进行加锁。这样可以确保每次只有一个线程进行文件写入操作,避免多个线程同时写入同一个文件。

下面是修改后的代码示例:

  1. import json
  2. import re
  3. import threading
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor
  6. from queue import Queue
  7. from urllib.parse import urljoin
  8. import jieba
  9. from spellchecker import SpellChecker
  10. import jieba.analyse
  11. from bs4 import BeautifulSoup

  12. lock = threading.Lock()
  13. queue = Queue()

  14. def load_stopwords(file_path):
  15.     with open(file_path, 'r', encoding='utf-8-sig') as file:
  16.         stop_words = [line.strip() for line in file]
  17.     return stop_words

  18. # 其他函数定义...

  19. def to_text(num, current_url, title, content, links):
  20.     with lock:
  21.         data = {"id": num, "url": current_url, "title": title, "content": content, "links": links}
  22.         with open(f'A:/搜索引擎系统/Test/web{num}.txt', 'w', encoding='utf-8-sig') as file:
  23.             json.dump(data, file, ensure_ascii=False)

  24. # 其他函数定义...

  25. def do_work(page):
  26.     try:
  27.         with open(f'A:/搜索引擎系统/all_information/web{page}.txt', 'r', encoding='utf-8-sig') as file:
  28.             text = file.read()
  29.             num = text.split('\n')[0]
  30.             current_url = text.split('\n')[1]
  31.             html = str(text.split('\n')[2:])
  32.             title, content, links = get_title_and_content(current_url, html)
  33.             content = Separate_words(content)
  34.             to_text(num, current_url, title, content, links)
  35.             print(num)
  36.     except FileNotFoundError:
  37.         print("文件不存在,请检查文件路径!")

  38. if __name__ == '__main__':
  39.     # 设置线程数
  40.     num_threads = 36
  41.     page = 1
  42.     target = 100

  43.     for num in range(1, target + 1):
  44.         queue.put(num)

  45.     start_time = time.time()

  46.     with ThreadPoolExecutor(num_threads) as pool:
  47.         while not queue.empty():
  48.             page = queue.get()
  49.             pool.submit(do_work, page)
  50.             time.sleep(0.1)

  51.     pool.shutdown(cancel_futures=True)

  52.     end_time = time.time()
  53.     print(end_time - start_time)
复制代码


在修改后的代码中,我对 to_text 函数进行了修改,在写入文件之前使用 lock 进行加锁,确保每次只有一个线程能够执行写入操作。

希望这些修改能够解决您遇到的文件缺失问题。如果问题仍然存在,请提供更多详细信息,以便我能够给出进一步的建议。
如果问题已经解决,请设置最佳答案
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2023-9-23 20:43:17 | 显示全部楼层
问题描述:

我使用多线程处理获取到的HTML数据,并进行存储,但是在运行过程中出现了文件缺失的问题。我希望得到解决该问题的方法。

解决方案:

文件缺失的问题可能是由于多线程同时写入文件导致的。在多线程环境下,多个线程同时访问同一个文件,可能会导致文件写入冲突,进而导致文件丢失或损坏。

为了解决这个问题,你可以考虑以下两个方面的改进:

1. 使用文件锁:你已经使用了 threading.Lock() 创建了一个锁对象 lock,但是在写入文件时没有使用该锁进行保护。在 to_text() 函数中,你可以使用锁来保证同一时间只有一个线程可以访问写入文件的代码块。具体做法是,在写入文件之前,调用 lock.acquire() 获取锁,写入文件之后,调用 lock.release() 释放锁。这样可以确保每个线程在写入文件时不会被其他线程打断。

  
  1.    def to_text(num, current_url, title, content, links):
  2.        with lock:
  3.            # 获取锁
  4.            data = {
  5.                "id": num,
  6.                "url": current_url,
  7.                "title": title,
  8.                "content": content,
  9.                "links": links
  10.            }
  11.            with open(f'A:/搜索引擎系统/Test/web{page}.txt', 'w', encoding='utf-8-sig') as file:
  12.                json.dump(data, file, ensure_ascii=False)
  13.            # 释放锁
  14.    
复制代码

2. 合理设置线程池大小:在使用 ThreadPoolExecutor 创建线程池时,可以通过参数 max_workers 来指定线程池的大小。你目前将 max_workers 设置为了 36,这意味着最多同时执行 36 个线程。如果你的机器资源有限,可能会导致同时写入文件的线程过多,进而增加文件冲突的概率。你可以根据系统资源和具体需求来调整线程池的大小,以避免文件缺失问题。

  
  1.    with ThreadPoolExecutor(max_workers=10) as pool:
  2.        # 线程池的其他代码...
  3.    
复制代码

通过以上两个改进,你可以尝试解决文件缺失的问题。希望能帮助到你!如果还有其他问题,请随时追问。

球一个最佳答案谢谢啦!这对我非常重要!
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2023-9-23 21:55:47 | 显示全部楼层
陶远航 发表于 2023-9-23 20:43
通过阅读您提供的代码,我注意到在多线程处理的过程中,您使用了一个共享的队列 queue 来存储待处理的任务 ...

好的,谢谢
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-5-20 06:48

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表