|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
@FishC 如题。代码实现的功能是,每天爬取几个平台的新闻,输出到以日期时间为名的PDF文件里。
代码如下:import requests
import sqlite3
import time
import random
import jieba
import os
from bs4 import BeautifulSoup
from datetime import datetime
from wordcloud import WordCloud
from collections import Counter
import matplotlib.pyplot as plt
from weasyprint import HTML
from simhash import Simhash
# ==================== 配置区域 ====================
DATABASE = 'news.db'
FONT_PATH = 'msyh.ttc'
STOPWORDS_FILE = 'stopwords.txt'
REQUEST_INTERVAL = (1, 3)
REPORT_DIR = r'H:\daynews'
TARGETS = {
'联合早报-中国': {
'url': 'https://www.zaobao.com/realtime/china',
'selector': '.article-list .title',
'method': 'get'
},
'澎湃新闻-热点': {
'url': 'https://www.thepaper.cn/',
'selector': '.news_li h2 a',
'method': 'get'
},
'微博热搜': {
'url': 'https://s.weibo.com/top/summary',
'selector': 'tr td.td-02 a', # 使用开发者工具检查元素更新选择器
'method': 'get',
'headers': {
'Cookie': 'SUB=你的实际cookie' # 长期采集建议获取cookie
}
},
'百度热榜': {
'url': 'https://top.baidu.com/board?tab=realtime',
'selector': '.container-bg_lQ801 .content_1YWBm', # 更新选择器路径
'method': 'get',
'headers': {
'Referer': 'https://top.baidu.com/',
'Cookie': 'BAIDUID=你的实际Cookie值;' # 需通过浏览器获取
}
},
'头条热榜': {
'url': 'https://www.toutiao.com/hot-event/hot-board/',
'method': 'api',
'params': {'origin': 'toutiao_pc'},
'headers': {
'Referer': 'https://www.toutiao.com/'
}
}
# ==================== 核心功能 ====================
class NewsCrawler:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Accept-Encoding': 'gzip, deflate, br'
}
def safe_request(self, url, method='get', **kwargs):
try:
time.sleep(random.uniform(*REQUEST_INTERVAL))
response = requests.request(
method,
url,
headers=self.headers,
timeout=10,
**kwargs
)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response
except Exception as e:
print(f'请求失败: {url} | 错误: {str(e)}')
return None
def parse_web(self, target):
response = self.safe_request(target['url'], target['method'],
headers={**self.headers, **target.get('headers', {})})
if not response:
return []
soup = BeautifulSoup(response.text, 'lxml')
# 百度热榜特殊处理
if 'baidu' in target['url']:
main_titles = [a.text.strip() for a in soup.select(target['selector'])]
sub_titles = [div.text.strip() for div in soup.select('.hot-desc_1m_jR')]
return list(set(main_titles + sub_titles))[:15]
else:
return [a.text.strip() for a in soup.select(target['selector'])][:10]
def parse_api(self, target):
try:
response = self.safe_request(
target['url'],
target.get('method', 'get'),
params=target.get('params'),
headers=target.get('headers', {})
)
if not response:
return []
data = response.json()
# 动态字段映射表
field_mapping = {
'zhihu': ('data', 'target', 'title'),
'toutiao': ('data', None, 'Title'),
'weibo': ('data', 'hotgov', 'word')
}
platform = next((k for k in field_mapping if k in target['url']), 'default')
root_key, mid_key, title_key = field_mapping.get(
platform,
('data', None, 'title')
)
items = data.get(root_key, [])
results = []
for item in items[:10]:
try:
if mid_key:
title = item[mid_key][title_key]
else:
title = item[title_key]
results.append(title.strip())
except (KeyError, TypeError):
continue
return results
except Exception as e:
print(f"API解析失败:{target['url']} | 错误:{str(e)}")
return []
def crawl_all(self):
all_titles = {}
seen_titles = set()
for name, target in TARGETS.items():
print(f'正在爬取: {name}')
# 获取原始标题列表
if target['method'] == 'api':
raw_titles = self.parse_api(target)
else:
raw_titles = self.parse_web(target)
unique_titles = [
title for title in raw_titles
if title not in seen_titles
]
seen_titles.update(unique_titles)
if unique_titles:
all_titles[name] = unique_titles
self.save_to_db(name, unique_titles)
return all_titles
def save_to_db(self, source, titles):
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS title_fingerprints
(id INTEGER PRIMARY KEY,
simhash INTEGER UNIQUE)''')
c.execute('''CREATE TABLE IF NOT EXISTS headlines
(id INTEGER PRIMARY KEY,
source TEXT,
title TEXT,
date TEXT)''')
date = datetime.now().strftime("%Y-%m-%d")
for title in titles:
c.execute("INSERT INTO headlines (source, title, date) VALUES (?, ?, ?)",
(source, title, date))
simhash = Simhash(title).value
try:
c.execute("INSERT INTO title_fingerprints (simhash) VALUES (?)",
(simhash,))
except sqlite3.IntegrityError:
pass
conn.commit()
conn.close()
def safe_request(self, url, method='get', **kwargs):
time.sleep(random.uniform(*REQUEST_INTERVAL))
self.headers['User-Agent'] = random.choice([
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1...'
])
try:
response = requests.request(
method,
url,
headers=self.headers,
timeout=10,
proxies={'http': 'socks5://127.0.0.1:1080'}, # 使用代理池时另加
**kwargs
)
response.raise_for_status()
return response
except Exception as e:
print(f'请求失败: {url} | 错误: {str(e)}')
return None
# ReportGenerator类定义
class ReportGenerator:
def __init__(self):
with open(STOPWORDS_FILE, 'r', encoding='utf-8') as f:
self.stopwords = set(f.read().splitlines())
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M")
self.simhash_cache = self.load_historical_hashes()
def get_today_titles(self):
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT title FROM headlines WHERE date = ?",
(datetime.now().strftime("%Y-%m-%d"),))
raw_titles = [row[0] for row in c.fetchall()]
return self.deduplicate(raw_titles)
def analyze_keywords(self, titles):
words = []
for title in titles:
words += [word for word in jieba.cut(title)
if word not in self.stopwords and len(word) > 1]
return Counter(words).most_common(20)
def generate_wordcloud(self):
titles = self.get_today_titles()
keywords = self.analyze_keywords(titles)
word_freq = {word: freq for word, freq in keywords}
wc = WordCloud(
font_path=FONT_PATH,
width=800,
height=600,
background_color='white'
).generate_from_frequencies(word_freq)
img_path = f'temp_wc_{self.timestamp}.png'
plt.figure(figsize=(12, 8))
plt.imshow(wc)
plt.axis("off")
plt.savefig(img_path, bbox_inches='tight')
plt.close()
return img_path, keywords
def create_pdf_report(self):
img_path, keywords = self.generate_wordcloud()
html_content = f'''
<html>
<head>
<meta charset="utf-8">
<title>新闻热点分析报告</title>
<style>
body {{ font-family: Arial; margin: 2cm; }}
h1 {{ color: #2c3e50; border-bottom: 2px solid #3498db; }}
.timestamp {{ color: #7f8c8d; }}
.keywords {{ columns: 2; margin: 20px 0; }}
img {{ width: 80%; margin: 20px auto; display: block; }}
</style>
</head>
<body>
<h1>新闻热点分析报告</h1>
<div class="timestamp">生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M")}</div>
<h2>今日TOP20关键词</h2>
<div class="keywords">
{''.join([f'<div>{word[0]} ({word[1]})</div>' for word in keywords])}
</div>
<h2>词云图</h2>
<img src="{img_path}">
</body>
</html>
'''
os.makedirs(REPORT_DIR, exist_ok=True)
pdf_filename = f'热点分析报告_{self.timestamp}.pdf'
pdf_path = os.path.join(REPORT_DIR, pdf_filename)
HTML(string=html_content).write_pdf(pdf_path)
if os.path.exists(img_path):
os.remove(img_path)
return pdf_path
def load_historical_hashes(self):
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT simhash FROM title_fingerprints")
simhashes = {row[0] for row in c.fetchall()}
conn.close()
return simhashes
def deduplicate(self, titles):
unique = list(set(titles))
unique = self.remove_edit_duplicates(unique)
return self.remove_semantic_duplicates(unique)
def remove_edit_duplicates(self, titles):
filtered = []
for title in titles:
if not any(self.edit_distance(title, t) < 3 for t in filtered):
filtered.append(title)
return filtered
def remove_semantic_duplicates(self, titles):
filtered = []
for title in titles:
current_hash = Simhash(title).value
if not any(self.hamming_distance(current_hash, h) < 6 for h in self.simhash_cache):
filtered.append(title)
self.simhash_cache.add(current_hash)
return filtered
@staticmethod
def edit_distance(s1, s2):
m, n = len(s1), len(s2)
dp = [[0]*(n+1) for _ in range(m+1)]
for i in range(m+1): dp[i][0] = i
for j in range(n+1): dp[0][j] = j
for i in range(1, m+1):
for j in range(1, n+1):
cost = 0 if s1[i-1] == s2[j-1] else 1
dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost)
return dp[m][n]
@staticmethod
def hamming_distance(h1, h2):
return bin(h1 ^ h2).count('1')
# ==================== 主程序 ====================
if __name__ == '__main__':
try:
crawler = NewsCrawler()
crawler.crawl_all()
reporter = ReportGenerator()
pdf_path = reporter.create_pdf_report()
print(f'报告已生成: {os.path.abspath(pdf_path)}')
except Exception as e:
print(f'程序运行出错: {str(e)}')
目前编译到class NewsCrawler那里,提示有语法错误,请看一下具体是什么问题。
本帖最后由 小甲鱼的二师兄 于 2025-2-3 18:25 编辑
下面代码报语法错误的原因是 TARGETS 配置中“头条热榜”的字典没有正确闭合。仔细看这一段代码:
'头条热榜': {
'url': 'https://www.toutiao.com/hot-event/hot-board/',
'method': 'api',
'params': {'origin': 'toutiao_pc'},
'headers': {
'Referer': 'https://www.toutiao.com/'
}
}
而你提供的代码中写成这样:
'头条热榜': {
'url': 'https://www.toutiao.com/hot-event/hot-board/',
'method': 'api',
'params': {'origin': 'toutiao_pc'},
'headers': {
'Referer': 'https://www.toutiao.com/'
}
如果不小心少了一个右大括号或者右括号,则 Python 编译器会提示语法错误。
|
|