|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
import json
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from urllib.parse import urljoin
import jieba
from spellchecker import SpellChecker
import jieba.analyse
from bs4 import BeautifulSoup
lock = threading.Lock()
queue = Queue()
def load_stopwords(file_path):
with open(file_path, 'r', encoding='utf-8-sig') as file:
stop_words = [line.strip() for line in file]
return stop_words
stop_words = load_stopwords('A:/搜索引擎系统/停用词表.txt')
def get_all_hrefs(text: str, current_url):
all_href = []
# 使用BeautifulSoup来解析网页源码
soup = BeautifulSoup(text, 'lxml')
# 获取所有a标签下的所有href属性
all_a_tags = soup.find_all('a')
for a_tag in all_a_tags:
href = a_tag.get('href')
if href and not href.startswith('#') and not href.startswith('javascript:'):
absolute_url = urljoin(current_url, href)
# 将https替换为http避免重复
new_url = absolute_url.replace("https", "http")
all_href.append(new_url)
# 将得到的链接返回
return all_href
def is_chinese(word):
for char in word:
if '\u4e00' <= char <= '\u9fff': # 中文字符的Unicode范围
return True
return False
def remove_gibberish_words(words):
spell = SpellChecker()
filtered_words = []
for word in words:
if not is_chinese(word): # 检查单词是否为中文
if spell.known([word]): # 对于非中文单词进行拼写检查
filtered_words.append(word)
else:
filtered_words.append(word)
return filtered_words
def Separate_words(content):
# 使用正则表达式的 sub 方法将标点符号替换为空格
content = re.sub("[^\w\s]+", "", content)
content = list(jieba.cut(content, cut_all=False, use_paddle=True))
content = remove_gibberish_words(content)
content = [element for element in content if element.strip() != '']
content = [word for word in content if word.lower() not in stop_words]
return content
def to_text(num, current_url, title, content, links):
with lock:
data = {
"id": num,
"url": current_url,
"title": title,
"content": content,
"links": links
}
with open(f'A:/搜索引擎系统/Test/web{page}.txt', 'w', encoding='utf-8-sig') as file:
json.dump(data, file, ensure_ascii=False)
def get_title_and_content(current_url, html):
soup = BeautifulSoup(html, 'html.parser')
title = []
if soup.get('title') != None:
title = soup.find('title').get_text()
content = soup.get_text()
content = content.replace('\n', '')
links = get_all_hrefs(html, current_url)
return title, content, links
def do_work(page):
try:
with open(f'A:/搜索引擎系统/all_information/web{page}.txt', 'r', encoding='utf-8-sig') as file:
text = file.read()
num = text.split('\n')[0]
current_url = text.split('\n')[1]
html = str(text.split('\n')[2:])
title, content, links = get_title_and_content(current_url, html)
content = Separate_words(content)
to_text(num, current_url, title, content, links)
print(num)
except FileNotFoundError:
# 文件不存在时的处理
print("文件不存在,请检查文件路径!")
if __name__ == '__main__':
page = 1
target = 100
for num in range(1, target + 1):
queue.put(num)
start_time = time.time()
with ThreadPoolExecutor(36) as pool:
while not queue.empty():
page = queue.get()
pool.submit(do_work, page)
time.sleep(0.1)
pool.shutdown(cancel_futures=True)
end_time = time.time()
print(end_time - start_time)
我使用多线程处理得到的html数据并存储,遇到了文件缺失的问题
通过阅读您提供的代码,我注意到在多线程处理的过程中,您使用了一个共享的队列 queue 来存储待处理的任务,并且每个线程从队列中获取任务并进行处理。然而,在您的代码中,并没有对文件操作进行加锁,这可能导致多个线程同时访问同一个文件,从而导致文件缺失问题。
为了解决这个问题,您可以考虑使用文件锁来确保同一时间只有一个线程可以访问文件。在 to_text 函数中,您可以将文件写入操作放在一个临界区域内,然后使用 lock 对象对该临界区域进行加锁。这样可以确保每次只有一个线程进行文件写入操作,避免多个线程同时写入同一个文件。
下面是修改后的代码示例:
import json
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from urllib.parse import urljoin
import jieba
from spellchecker import SpellChecker
import jieba.analyse
from bs4 import BeautifulSoup
lock = threading.Lock()
queue = Queue()
def load_stopwords(file_path):
with open(file_path, 'r', encoding='utf-8-sig') as file:
stop_words = [line.strip() for line in file]
return stop_words
# 其他函数定义...
def to_text(num, current_url, title, content, links):
with lock:
data = {"id": num, "url": current_url, "title": title, "content": content, "links": links}
with open(f'A:/搜索引擎系统/Test/web{num}.txt', 'w', encoding='utf-8-sig') as file:
json.dump(data, file, ensure_ascii=False)
# 其他函数定义...
def do_work(page):
try:
with open(f'A:/搜索引擎系统/all_information/web{page}.txt', 'r', encoding='utf-8-sig') as file:
text = file.read()
num = text.split('\n')[0]
current_url = text.split('\n')[1]
html = str(text.split('\n')[2:])
title, content, links = get_title_and_content(current_url, html)
content = Separate_words(content)
to_text(num, current_url, title, content, links)
print(num)
except FileNotFoundError:
print("文件不存在,请检查文件路径!")
if __name__ == '__main__':
# 设置线程数
num_threads = 36
page = 1
target = 100
for num in range(1, target + 1):
queue.put(num)
start_time = time.time()
with ThreadPoolExecutor(num_threads) as pool:
while not queue.empty():
page = queue.get()
pool.submit(do_work, page)
time.sleep(0.1)
pool.shutdown(cancel_futures=True)
end_time = time.time()
print(end_time - start_time)
在修改后的代码中,我对 to_text 函数进行了修改,在写入文件之前使用 lock 进行加锁,确保每次只有一个线程能够执行写入操作。
希望这些修改能够解决您遇到的文件缺失问题。如果问题仍然存在,请提供更多详细信息,以便我能够给出进一步的建议。
如果问题已经解决,请设置最佳答案
|
|