|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
如题。笔趣阁的服务器应该挺多,我这个是“https://www.22biqu.com/”,为了把书爬下来放手机里离线看准备的,每个章节1个TXT,用的时候合并到一起,传手机上就可以了。
- """
- 笔趣阁爬书小程序
- python>=3.8
- 依赖: pip install playwright tqdm
- playwright install # 仅需第一次
- Author:shadowmage
- """
- import os
- import re
- import time
- from urllib.parse import urljoin
- from playwright.sync_api import sync_playwright
- SAVE_DIR = "output"
- # ---------- 工具 ----------
- def sanitize_filename(name):
- return re.sub(r'[\\/*?:"<>|]', "_", name)
- def get_last_page(base_url):
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- ctx = browser.new_context(ignore_https_errors=True)
- page = ctx.new_page()
- page.goto(base_url, timeout=60000)
- last_option = page.locator("#indexselect option:last-child").get_attribute("value")
- browser.close()
- return int(last_option.split("/")[-2])
- def get_chapter_links(base_url, last_page):
- links = []
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- ctx = browser.new_context(ignore_https_errors=True)
- page = ctx.new_page()
- for i in range(1, last_page + 1):
- url = f"{base_url.rstrip('/')}/{i}/"
- print(f"正在获取目录页:{url}")
- page.goto(url, timeout=60000)
- # 只拿第二个 section-box 里的章节
- section = page.locator(".section-box").nth(1)
- section.wait_for()
- lis = section.locator("li").all()
- for li in lis:
- a = li.locator("a")
- href = a.get_attribute("href")
- title = a.inner_text()
- full_url = urljoin("https://www.22biqu.com", href)
- links.append((title, full_url))
- browser.close()
- return links
- def get_chapter_content(url):
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- ctx = browser.new_context(ignore_https_errors=True)
- page = ctx.new_page()
- page.goto(url, timeout=60000)
- page.wait_for_selector("#content")
- title = page.locator("h1.title").inner_text()
- content = page.locator("#content").inner_text()
- browser.close()
- return title, content
- # ---------- 保存txt ----------
- def get_output_dir(base_url: str) -> str:
- """
- 根据小说首页返回专属输出目录路径:
- 1. 先打开页面抓取书名
- 2. 生成合法文件夹名
- 3. 确保目录存在并返回绝对路径
- """
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- ctx = browser.new_context(ignore_https_errors=True)
- page = ctx.new_page()
- page.goto(base_url, timeout=60000)
- # 站点如把书名放在 h1,可按实际改选择器
- book_name = page.locator("h1:not(.logo)").inner_text().strip()
- browser.close()
- safe_name = re.sub(r'[\\/:*?"<>|]', "", book_name) or "book"
- out_dir = os.path.join(SAVE_DIR, safe_name)
- os.makedirs(out_dir, exist_ok=True)
- return out_dir
- # ---------- 补漏逻辑 ----------
- def repair_mode():
- if not os.path.exists("failed.txt"):
- return [] # 无失败记录,直接回主流程
- with open("failed.txt", "r", encoding="utf-8") as f:
- tasks = [l.strip().split("\t") for l in f if l.strip()]
- if not tasks:
- return []
- print(f"检测到 failed.txt,共 {len(tasks)} 章需要补漏。")
- still_failed = []
- for idx, (title, url) in enumerate(tasks, 1):
- try:
- print(f"[{idx}/{len(tasks)}] 补漏:{title}")
- chapter_title, content = get_chapter_content(url)
- filename = f"{sanitize_filename(chapter_title)}.txt"
- with open(os.path.join("novel_output", filename), "w", encoding="utf-8") as f:
- f.write(f"{chapter_title}\n\n{content}")
- time.sleep(15)
- except Exception as e:
- print(f"仍失败:{title},{e}")
- still_failed.append((title, url))
- # 重写失败文件
- if still_failed:
- with open("failed.txt", "w", encoding="utf-8") as f:
- for title, url in still_failed:
- f.write(f"{title}\t{url}\n")
- print(f"还有 {len(still_failed)} 章未成功,可再次运行本脚本补漏。")
- return still_failed
- else:
- os.remove("failed.txt")
- print("补漏完成,failed.txt 已清除。")
- return []
- # ---------- 主流程 ----------
- def main():
- base_url = input("请输入小说首页地址:").strip()
- if not base_url.startswith("http"):
- print("请输入完整的网址,以 http 或 https 开头")
- return
- base_url = base_url.rstrip("/") + "/"
- # 1. 先进入补漏模式(如果有 failed.txt)
- failed = repair_mode()
- if failed:
- print("请先处理完失败章节再执行完整下载。")
- return
- # 2. 正常流程:获取全部目录
- last_page = get_last_page(base_url)
- print(f"检测到目录共 {last_page} 页")
- chapters = get_chapter_links(base_url, last_page)
- print(f"共获取到 {len(chapters)} 个章节")
- out_dir = get_output_dir(base_url)
- failed = []
- for idx, (title, url) in enumerate(chapters, 1):
- try:
- print(f"[{idx}/{len(chapters)}] 正在下载:{title}")
- chapter_title, content = get_chapter_content(url)
- filename = f"{idx:03}_{sanitize_filename(chapter_title)}.txt"
- with open(os.path.join(out_dir, filename), "w", encoding="utf-8") as f:
- f.write(f"{chapter_title}\n\n{content}")
- time.sleep(15)
- except Exception as e:
- print(f"下载失败:{title},{e}")
- failed.append((title, url))
- # 3. 把失败记录落盘
- if failed:
- with open("failed.txt", "w", encoding="utf-8") as f:
- for title, url in failed:
- f.write(f"{title}\t{url}\n")
- print(f"共有 {len(failed)} 章下载失败,已写入 failed.txt,下次运行本脚本将自动补漏。")
- else:
- print("全部下载完成!")
- if __name__ == "__main__":
- main()
复制代码 |
|