鱼C论坛

 找回密码
 立即注册
查看: 79|回复: 4

[已解决]编了个爬新闻的爬虫,出问题了

[复制链接]
发表于 3 天前 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
@FishC  如题。代码实现的功能是,每天爬取几个平台的新闻,输出到以日期时间为名的PDF文件里。
代码如下:
import requests
import sqlite3
import time
import random
import jieba
import os
from bs4 import BeautifulSoup
from datetime import datetime
from wordcloud import WordCloud
from collections import Counter
import matplotlib.pyplot as plt
from weasyprint import HTML
from simhash import Simhash

# ==================== 配置区域 ====================
DATABASE = 'news.db'
FONT_PATH = 'msyh.ttc'
STOPWORDS_FILE = 'stopwords.txt'
REQUEST_INTERVAL = (1, 3)
REPORT_DIR = r'H:\daynews'

TARGETS = {
    '联合早报-中国': {
        'url': 'https://www.zaobao.com/realtime/china',
        'selector': '.article-list .title',
        'method': 'get'
    },
    '澎湃新闻-热点': {
        'url': 'https://www.thepaper.cn/',
        'selector': '.news_li h2 a',
        'method': 'get'
    },
    '微博热搜': {
        'url': 'https://s.weibo.com/top/summary',
        'selector': 'tr td.td-02 a',  # 使用开发者工具检查元素更新选择器
        'method': 'get',
        'headers': {
            'Cookie': 'SUB=你的实际cookie'  # 长期采集建议获取cookie
        }
    },
    '百度热榜': {
        'url': 'https://top.baidu.com/board?tab=realtime',
        'selector': '.container-bg_lQ801 .content_1YWBm',  # 更新选择器路径
        'method': 'get',
        'headers': {
            'Referer': 'https://top.baidu.com/',
            'Cookie': 'BAIDUID=你的实际Cookie值;'  # 需通过浏览器获取
        }
    },
    '头条热榜': {
        'url': 'https://www.toutiao.com/hot-event/hot-board/',
        'method': 'api',
        'params': {'origin': 'toutiao_pc'},
        'headers': {
            'Referer': 'https://www.toutiao.com/'
    }
}

# ==================== 核心功能 ====================
class NewsCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Accept-Encoding': 'gzip, deflate, br'
        }
        
    def safe_request(self, url, method='get', **kwargs):
        try:
            time.sleep(random.uniform(*REQUEST_INTERVAL))
            response = requests.request(
                method,
                url,
                headers=self.headers,
                timeout=10,
                **kwargs
            )
            response.raise_for_status()
            response.encoding = response.apparent_encoding
            return response
        except Exception as e:
            print(f'请求失败: {url} | 错误: {str(e)}')
            return None

    def parse_web(self, target):
        response = self.safe_request(target['url'], target['method'], 
                                    headers={**self.headers, **target.get('headers', {})})
        if not response:
            return []
        
        soup = BeautifulSoup(response.text, 'lxml')
    
        # 百度热榜特殊处理
        if 'baidu' in target['url']:
            main_titles = [a.text.strip() for a in soup.select(target['selector'])]
        
            sub_titles = [div.text.strip() for div in soup.select('.hot-desc_1m_jR')]
        
            return list(set(main_titles + sub_titles))[:15]
        else:
            return [a.text.strip() for a in soup.select(target['selector'])][:10]
    

    def parse_api(self, target):
        try:
            response = self.safe_request(
                target['url'],
                target.get('method', 'get'),
                params=target.get('params'),
                headers=target.get('headers', {})
            )
            if not response:
                return []

            data = response.json()

            # 动态字段映射表
            field_mapping = {
                'zhihu': ('data', 'target', 'title'),
                'toutiao': ('data', None, 'Title'),
                'weibo': ('data', 'hotgov', 'word')
            }

            platform = next((k for k in field_mapping if k in target['url']), 'default')

            root_key, mid_key, title_key = field_mapping.get(
                platform, 
                ('data', None, 'title')
            )

            items = data.get(root_key, [])
            results = []
            for item in items[:10]:
                try:
                    if mid_key:
                        title = item[mid_key][title_key]
                    else:
                        title = item[title_key]
                    results.append(title.strip())
                except (KeyError, TypeError):
                    continue

            return results

        except Exception as e:
            print(f"API解析失败:{target['url']} | 错误:{str(e)}")
            return []


    def crawl_all(self):
        all_titles = {}
        seen_titles = set()

        for name, target in TARGETS.items():
            print(f'正在爬取: {name}')
            
            # 获取原始标题列表
            if target['method'] == 'api':
                raw_titles = self.parse_api(target)
            else:
                raw_titles = self.parse_web(target)
            
            unique_titles = [
                title for title in raw_titles
                if title not in seen_titles
            ]
            
            seen_titles.update(unique_titles)
            if unique_titles:
                all_titles[name] = unique_titles
                self.save_to_db(name, unique_titles)
                
        return all_titles

    def save_to_db(self, source, titles):
        conn = sqlite3.connect(DATABASE)
        c = conn.cursor()
        
        c.execute('''CREATE TABLE IF NOT EXISTS title_fingerprints
                    (id INTEGER PRIMARY KEY,
                     simhash INTEGER UNIQUE)''')
        c.execute('''CREATE TABLE IF NOT EXISTS headlines
                    (id INTEGER PRIMARY KEY,
                     source TEXT,
                     title TEXT,
                     date TEXT)''')
        
        date = datetime.now().strftime("%Y-%m-%d")
        for title in titles:
            c.execute("INSERT INTO headlines (source, title, date) VALUES (?, ?, ?)",
                     (source, title, date))
            
            simhash = Simhash(title).value
            try:
                c.execute("INSERT INTO title_fingerprints (simhash) VALUES (?)",
                         (simhash,))
            except sqlite3.IntegrityError:
                pass
                
        conn.commit()
        conn.close()

    def safe_request(self, url, method='get', **kwargs):
        time.sleep(random.uniform(*REQUEST_INTERVAL))
    
        self.headers['User-Agent'] = random.choice([
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1...'
        ])
    
        try:
            response = requests.request(
                method,
                url,
                headers=self.headers,
                timeout=10,
                proxies={'http': 'socks5://127.0.0.1:1080'},  # 使用代理池时另加
                **kwargs
            )
            response.raise_for_status()
            return response
        except Exception as e:
            print(f'请求失败: {url} | 错误: {str(e)}')
            return None


# ReportGenerator类定义
class ReportGenerator:
    def __init__(self):
        with open(STOPWORDS_FILE, 'r', encoding='utf-8') as f:
            self.stopwords = set(f.read().splitlines())
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M")
        self.simhash_cache = self.load_historical_hashes() 
            
    def get_today_titles(self):
        conn = sqlite3.connect(DATABASE)
        c = conn.cursor()
        c.execute("SELECT title FROM headlines WHERE date = ?", 
                 (datetime.now().strftime("%Y-%m-%d"),))
        raw_titles = [row[0] for row in c.fetchall()]
        
        return self.deduplicate(raw_titles)

    def analyze_keywords(self, titles):
        words = []
        for title in titles:
            words += [word for word in jieba.cut(title) 
                     if word not in self.stopwords and len(word) > 1]
                     
        return Counter(words).most_common(20)

    def generate_wordcloud(self):
        titles = self.get_today_titles()
        keywords = self.analyze_keywords(titles)
        
        word_freq = {word: freq for word, freq in keywords}
        wc = WordCloud(
            font_path=FONT_PATH,
            width=800,
            height=600,
            background_color='white'
        ).generate_from_frequencies(word_freq)
        
        img_path = f'temp_wc_{self.timestamp}.png'
        plt.figure(figsize=(12, 8))
        plt.imshow(wc)
        plt.axis("off")
        plt.savefig(img_path, bbox_inches='tight')
        plt.close()
        return img_path, keywords

    def create_pdf_report(self):
        img_path, keywords = self.generate_wordcloud()

        html_content = f'''
        <html>
            <head>
                <meta charset="utf-8">
                <title>新闻热点分析报告</title>
                <style>
                    body {{ font-family: Arial; margin: 2cm; }}
                    h1 {{ color: #2c3e50; border-bottom: 2px solid #3498db; }}
                    .timestamp {{ color: #7f8c8d; }}
                    .keywords {{ columns: 2; margin: 20px 0; }}
                    img {{ width: 80%; margin: 20px auto; display: block; }}
                </style>
            </head>
            <body>
                <h1>新闻热点分析报告</h1>
                <div class="timestamp">生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M")}</div>
                
                <h2>今日TOP20关键词</h2>
                <div class="keywords">
                    {''.join([f'<div>{word[0]} ({word[1]})</div>' for word in keywords])}
                </div>
                
                <h2>词云图</h2>
                <img src="{img_path}">
            </body>
        </html>
        '''
        
        os.makedirs(REPORT_DIR, exist_ok=True)
        pdf_filename = f'热点分析报告_{self.timestamp}.pdf'
        pdf_path = os.path.join(REPORT_DIR, pdf_filename)
        
        HTML(string=html_content).write_pdf(pdf_path)

        if os.path.exists(img_path):
            os.remove(img_path)
            
        return pdf_path

    def load_historical_hashes(self):
        conn = sqlite3.connect(DATABASE)
        c = conn.cursor()
        c.execute("SELECT simhash FROM title_fingerprints") 
        simhashes = {row[0] for row in c.fetchall()}
        conn.close()
        return simhashes

    def deduplicate(self, titles):
        unique = list(set(titles))
        unique = self.remove_edit_duplicates(unique)
        return self.remove_semantic_duplicates(unique)

    def remove_edit_duplicates(self, titles):
        filtered = []
        for title in titles:
            if not any(self.edit_distance(title, t) < 3 for t in filtered):
                filtered.append(title)
        return filtered

    def remove_semantic_duplicates(self, titles):
        filtered = []
        for title in titles:
            current_hash = Simhash(title).value
            if not any(self.hamming_distance(current_hash, h) < 6 for h in self.simhash_cache):
                filtered.append(title)
                self.simhash_cache.add(current_hash)
        return filtered

    @staticmethod
    def edit_distance(s1, s2):
        m, n = len(s1), len(s2)
        dp = [[0]*(n+1) for _ in range(m+1)]
        for i in range(m+1): dp[i][0] = i
        for j in range(n+1): dp[0][j] = j
        
        for i in range(1, m+1):
            for j in range(1, n+1):
                cost = 0 if s1[i-1] == s2[j-1] else 1
                dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost)
                
        return dp[m][n]

    @staticmethod
    def hamming_distance(h1, h2):
        return bin(h1 ^ h2).count('1')


# ==================== 主程序 ====================
if __name__ == '__main__':
    try:
        crawler = NewsCrawler()
        crawler.crawl_all()
        
        reporter = ReportGenerator()
        pdf_path = reporter.create_pdf_report()
        print(f'报告已生成: {os.path.abspath(pdf_path)}')
        
    except Exception as e:
        print(f'程序运行出错: {str(e)}')

目前编译到class NewsCrawler那里,提示有语法错误,请看一下具体是什么问题。
最佳答案
本帖最后由 小甲鱼的二师兄 于 2025-2-3 18:25 编辑

下面代码报语法错误的原因是 TARGETS 配置中“头条热榜”的字典没有正确闭合。仔细看这一段代码:
'头条热榜': {
    'url': 'https://www.toutiao.com/hot-event/hot-board/',
    'method': 'api',
    'params': {'origin': 'toutiao_pc'},
    'headers': {
        'Referer': 'https://www.toutiao.com/'
    }
}
而你提供的代码中写成这样:
'头条热榜': {
    'url': 'https://www.toutiao.com/hot-event/hot-board/',
    'method': 'api',
    'params': {'origin': 'toutiao_pc'},
    'headers': {
        'Referer': 'https://www.toutiao.com/'
}
如果不小心少了一个右大括号或者右括号,则 Python 编译器会提示语法错误。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

发表于 3 天前 | 显示全部楼层    本楼为最佳答案   
本帖最后由 小甲鱼的二师兄 于 2025-2-3 18:25 编辑

下面代码报语法错误的原因是 TARGETS 配置中“头条热榜”的字典没有正确闭合。仔细看这一段代码:
'头条热榜': {
    'url': 'https://www.toutiao.com/hot-event/hot-board/',
    'method': 'api',
    'params': {'origin': 'toutiao_pc'},
    'headers': {
        'Referer': 'https://www.toutiao.com/'
    }
}
而你提供的代码中写成这样:
'头条热榜': {
    'url': 'https://www.toutiao.com/hot-event/hot-board/',
    'method': 'api',
    'params': {'origin': 'toutiao_pc'},
    'headers': {
        'Referer': 'https://www.toutiao.com/'
}
如果不小心少了一个右大括号或者右括号,则 Python 编译器会提示语法错误。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 3 天前 | 显示全部楼层
小甲鱼的二师兄 发表于 2025-2-3 18:21
下面代码报语法错误的原因是 TARGETS 配置中“头条热榜”的字典没有正确闭合。仔细看这一段代码:

谢谢指正。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 3 天前 | 显示全部楼层
运行后,又出幺蛾子了,前面爬得还行,爬到百度热榜,错误提示来了:
程序运行出错: Python int too large to convert to SQLite INTEGER

@FishC
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 3 天前 | 显示全部楼层
shadowmage 发表于 2025-02-03 19:53
运行后,又出幺蛾子了,前面爬得还行,爬到百度热榜, ...


你遇到的错误是因为要存储到SQLite数据库的整数值超出了SQLite的INTEGER类型的范围。SQLite的INTEGER类型是有大小限制的,通常它能够存储的范围为 -2^63 到 2^63-1。

要解决这个问题,你可以尝试以下几种解决方案:

解决方案 1: 使用TEXT类型来存储大整数
将数据库表中存储整数的列改为TEXT类型,以便可以存储更大的数值。

解决方案 2: 使用更合适的数据类型
在Python中处理这些数据时,确保不会超过SQLite的存储限制。如果确实需要存储非常大的整数,可以考虑是否压缩或转换数据格式。

下面是一个示例代码,展示如何在创建表时使用TEXT类型:
import sqlite3

def create_table():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    # 创建表,使用TEXT类型来存储可能的大整数值
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS hotlist (
            id TEXT PRIMARY KEY,
            title TEXT,
            url TEXT
        )
    ''')
    conn.commit()
    conn.close()

create_table()

如果需要进一步操作或者有其他问题,请随时告诉我!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-2-6 02:01

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表