|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
@FishC 如题。代码实现的功能是,每天爬取几个平台的新闻,输出到以日期时间为名的PDF文件里。
代码如下:
- import requests
- import sqlite3
- import time
- import random
- import jieba
- import os
- from bs4 import BeautifulSoup
- from datetime import datetime
- from wordcloud import WordCloud
- from collections import Counter
- import matplotlib.pyplot as plt
- from weasyprint import HTML
- from simhash import Simhash
- # ==================== 配置区域 ====================
- DATABASE = 'news.db'
- FONT_PATH = 'msyh.ttc'
- STOPWORDS_FILE = 'stopwords.txt'
- REQUEST_INTERVAL = (1, 3)
- REPORT_DIR = r'H:\daynews'
- TARGETS = {
- '联合早报-中国': {
- 'url': 'https://www.zaobao.com/realtime/china',
- 'selector': '.article-list .title',
- 'method': 'get'
- },
- '澎湃新闻-热点': {
- 'url': 'https://www.thepaper.cn/',
- 'selector': '.news_li h2 a',
- 'method': 'get'
- },
- '微博热搜': {
- 'url': 'https://s.weibo.com/top/summary',
- 'selector': 'tr td.td-02 a', # 使用开发者工具检查元素更新选择器
- 'method': 'get',
- 'headers': {
- 'Cookie': 'SUB=你的实际cookie' # 长期采集建议获取cookie
- }
- },
- '百度热榜': {
- 'url': 'https://top.baidu.com/board?tab=realtime',
- 'selector': '.container-bg_lQ801 .content_1YWBm', # 更新选择器路径
- 'method': 'get',
- 'headers': {
- 'Referer': 'https://top.baidu.com/',
- 'Cookie': 'BAIDUID=你的实际Cookie值;' # 需通过浏览器获取
- }
- },
- '头条热榜': {
- 'url': 'https://www.toutiao.com/hot-event/hot-board/',
- 'method': 'api',
- 'params': {'origin': 'toutiao_pc'},
- 'headers': {
- 'Referer': 'https://www.toutiao.com/'
- }
- }
- # ==================== 核心功能 ====================
- class NewsCrawler:
- def __init__(self):
- self.headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Accept-Encoding': 'gzip, deflate, br'
- }
-
- def safe_request(self, url, method='get', **kwargs):
- try:
- time.sleep(random.uniform(*REQUEST_INTERVAL))
- response = requests.request(
- method,
- url,
- headers=self.headers,
- timeout=10,
- **kwargs
- )
- response.raise_for_status()
- response.encoding = response.apparent_encoding
- return response
- except Exception as e:
- print(f'请求失败: {url} | 错误: {str(e)}')
- return None
- def parse_web(self, target):
- response = self.safe_request(target['url'], target['method'],
- headers={**self.headers, **target.get('headers', {})})
- if not response:
- return []
-
- soup = BeautifulSoup(response.text, 'lxml')
-
- # 百度热榜特殊处理
- if 'baidu' in target['url']:
- main_titles = [a.text.strip() for a in soup.select(target['selector'])]
-
- sub_titles = [div.text.strip() for div in soup.select('.hot-desc_1m_jR')]
-
- return list(set(main_titles + sub_titles))[:15]
- else:
- return [a.text.strip() for a in soup.select(target['selector'])][:10]
-
- def parse_api(self, target):
- try:
- response = self.safe_request(
- target['url'],
- target.get('method', 'get'),
- params=target.get('params'),
- headers=target.get('headers', {})
- )
- if not response:
- return []
- data = response.json()
- # 动态字段映射表
- field_mapping = {
- 'zhihu': ('data', 'target', 'title'),
- 'toutiao': ('data', None, 'Title'),
- 'weibo': ('data', 'hotgov', 'word')
- }
- platform = next((k for k in field_mapping if k in target['url']), 'default')
- root_key, mid_key, title_key = field_mapping.get(
- platform,
- ('data', None, 'title')
- )
- items = data.get(root_key, [])
- results = []
- for item in items[:10]:
- try:
- if mid_key:
- title = item[mid_key][title_key]
- else:
- title = item[title_key]
- results.append(title.strip())
- except (KeyError, TypeError):
- continue
- return results
- except Exception as e:
- print(f"API解析失败:{target['url']} | 错误:{str(e)}")
- return []
- def crawl_all(self):
- all_titles = {}
- seen_titles = set()
- for name, target in TARGETS.items():
- print(f'正在爬取: {name}')
-
- # 获取原始标题列表
- if target['method'] == 'api':
- raw_titles = self.parse_api(target)
- else:
- raw_titles = self.parse_web(target)
-
- unique_titles = [
- title for title in raw_titles
- if title not in seen_titles
- ]
-
- seen_titles.update(unique_titles)
- if unique_titles:
- all_titles[name] = unique_titles
- self.save_to_db(name, unique_titles)
-
- return all_titles
- def save_to_db(self, source, titles):
- conn = sqlite3.connect(DATABASE)
- c = conn.cursor()
-
- c.execute('''CREATE TABLE IF NOT EXISTS title_fingerprints
- (id INTEGER PRIMARY KEY,
- simhash INTEGER UNIQUE)''')
- c.execute('''CREATE TABLE IF NOT EXISTS headlines
- (id INTEGER PRIMARY KEY,
- source TEXT,
- title TEXT,
- date TEXT)''')
-
- date = datetime.now().strftime("%Y-%m-%d")
- for title in titles:
- c.execute("INSERT INTO headlines (source, title, date) VALUES (?, ?, ?)",
- (source, title, date))
-
- simhash = Simhash(title).value
- try:
- c.execute("INSERT INTO title_fingerprints (simhash) VALUES (?)",
- (simhash,))
- except sqlite3.IntegrityError:
- pass
-
- conn.commit()
- conn.close()
- def safe_request(self, url, method='get', **kwargs):
- time.sleep(random.uniform(*REQUEST_INTERVAL))
-
- self.headers['User-Agent'] = random.choice([
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...',
- 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1...'
- ])
-
- try:
- response = requests.request(
- method,
- url,
- headers=self.headers,
- timeout=10,
- proxies={'http': 'socks5://127.0.0.1:1080'}, # 使用代理池时另加
- **kwargs
- )
- response.raise_for_status()
- return response
- except Exception as e:
- print(f'请求失败: {url} | 错误: {str(e)}')
- return None
- # ReportGenerator类定义
- class ReportGenerator:
- def __init__(self):
- with open(STOPWORDS_FILE, 'r', encoding='utf-8') as f:
- self.stopwords = set(f.read().splitlines())
- self.timestamp = datetime.now().strftime("%Y%m%d_%H%M")
- self.simhash_cache = self.load_historical_hashes()
-
- def get_today_titles(self):
- conn = sqlite3.connect(DATABASE)
- c = conn.cursor()
- c.execute("SELECT title FROM headlines WHERE date = ?",
- (datetime.now().strftime("%Y-%m-%d"),))
- raw_titles = [row[0] for row in c.fetchall()]
-
- return self.deduplicate(raw_titles)
- def analyze_keywords(self, titles):
- words = []
- for title in titles:
- words += [word for word in jieba.cut(title)
- if word not in self.stopwords and len(word) > 1]
-
- return Counter(words).most_common(20)
- def generate_wordcloud(self):
- titles = self.get_today_titles()
- keywords = self.analyze_keywords(titles)
-
- word_freq = {word: freq for word, freq in keywords}
- wc = WordCloud(
- font_path=FONT_PATH,
- width=800,
- height=600,
- background_color='white'
- ).generate_from_frequencies(word_freq)
-
- img_path = f'temp_wc_{self.timestamp}.png'
- plt.figure(figsize=(12, 8))
- plt.imshow(wc)
- plt.axis("off")
- plt.savefig(img_path, bbox_inches='tight')
- plt.close()
- return img_path, keywords
- def create_pdf_report(self):
- img_path, keywords = self.generate_wordcloud()
- html_content = f'''
- <html>
- <head>
- <meta charset="utf-8">
- <title>新闻热点分析报告</title>
- <style>
- body {{ font-family: Arial; margin: 2cm; }}
- h1 {{ color: #2c3e50; border-bottom: 2px solid #3498db; }}
- .timestamp {{ color: #7f8c8d; }}
- .keywords {{ columns: 2; margin: 20px 0; }}
- img {{ width: 80%; margin: 20px auto; display: block; }}
- </style>
- </head>
- <body>
- <h1>新闻热点分析报告</h1>
- <div class="timestamp">生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M")}</div>
-
- <h2>今日TOP20关键词</h2>
- <div class="keywords">
- {''.join([f'<div>{word[0]} ({word[1]})</div>' for word in keywords])}
- </div>
-
- <h2>词云图</h2>
- <img src="{img_path}">
- </body>
- </html>
- '''
-
- os.makedirs(REPORT_DIR, exist_ok=True)
- pdf_filename = f'热点分析报告_{self.timestamp}.pdf'
- pdf_path = os.path.join(REPORT_DIR, pdf_filename)
-
- HTML(string=html_content).write_pdf(pdf_path)
- if os.path.exists(img_path):
- os.remove(img_path)
-
- return pdf_path
- def load_historical_hashes(self):
- conn = sqlite3.connect(DATABASE)
- c = conn.cursor()
- c.execute("SELECT simhash FROM title_fingerprints")
- simhashes = {row[0] for row in c.fetchall()}
- conn.close()
- return simhashes
- def deduplicate(self, titles):
- unique = list(set(titles))
- unique = self.remove_edit_duplicates(unique)
- return self.remove_semantic_duplicates(unique)
- def remove_edit_duplicates(self, titles):
- filtered = []
- for title in titles:
- if not any(self.edit_distance(title, t) < 3 for t in filtered):
- filtered.append(title)
- return filtered
- def remove_semantic_duplicates(self, titles):
- filtered = []
- for title in titles:
- current_hash = Simhash(title).value
- if not any(self.hamming_distance(current_hash, h) < 6 for h in self.simhash_cache):
- filtered.append(title)
- self.simhash_cache.add(current_hash)
- return filtered
- @staticmethod
- def edit_distance(s1, s2):
- m, n = len(s1), len(s2)
- dp = [[0]*(n+1) for _ in range(m+1)]
- for i in range(m+1): dp[i][0] = i
- for j in range(n+1): dp[0][j] = j
-
- for i in range(1, m+1):
- for j in range(1, n+1):
- cost = 0 if s1[i-1] == s2[j-1] else 1
- dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost)
-
- return dp[m][n]
- @staticmethod
- def hamming_distance(h1, h2):
- return bin(h1 ^ h2).count('1')
- # ==================== 主程序 ====================
- if __name__ == '__main__':
- try:
- crawler = NewsCrawler()
- crawler.crawl_all()
-
- reporter = ReportGenerator()
- pdf_path = reporter.create_pdf_report()
- print(f'报告已生成: {os.path.abspath(pdf_path)}')
-
- except Exception as e:
- print(f'程序运行出错: {str(e)}')
复制代码
目前编译到class NewsCrawler那里,提示有语法错误,请看一下具体是什么问题。
本帖最后由 小甲鱼的二师兄 于 2025-2-3 18:25 编辑
下面代码报语法错误的原因是 TARGETS 配置中“头条热榜”的字典没有正确闭合。仔细看这一段代码:
- '头条热榜': {
- 'url': 'https://www.toutiao.com/hot-event/hot-board/',
- 'method': 'api',
- 'params': {'origin': 'toutiao_pc'},
- 'headers': {
- 'Referer': 'https://www.toutiao.com/'
- }
- }
复制代码
而你提供的代码中写成这样:
- '头条热榜': {
- 'url': 'https://www.toutiao.com/hot-event/hot-board/',
- 'method': 'api',
- 'params': {'origin': 'toutiao_pc'},
- 'headers': {
- 'Referer': 'https://www.toutiao.com/'
- }
复制代码
如果不小心少了一个右大括号或者右括号,则 Python 编译器会提示语法错误。
|
|