鱼C论坛

 找回密码
 立即注册
查看: 2809|回复: 0

[作品展示] B站,番剧下载爬虫(VIP版)

[复制链接]
发表于 2020-11-19 21:47:42 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
本帖最后由 兰竹皋 于 2020-11-19 21:49 编辑


现在b站番剧版权要求越来越严,越来越多的番剧不能直接下载了。
而我个人对于一些喜欢的番剧有收藏爱好,于是就写了这个小爬虫——通过番剧的ep号下载b站番剧(对于VIP账户而言)。
当然,为了方便而言我也就没有打包成GUI了。
下面展示一些爬取过程:
首先获取番剧的ep号,如图:
2.png
3.png
通常我们点入番剧后得到的并不是ep号,
此时只需要点击一下右方的任意一其他集,如图:
4.png
页面上方地址便出现了ep号。
当然,还有一些特殊番剧,只有一集,如:
16.png
无法通过上述方法获得,只能通过抓包(playurl开头的包),如图:
17.png


而同样,通常而言,一般番剧的ep号是连续数字,如图:
5.png
此时,执行程序时,只需要输入:
6.png

7.png

但同样,也总存在这样一些莫名其妙的番剧,如图:
8.png
9.png
番号并不连续。
此时,执行程序时,输入:
10.png
即可。

接着,输入下载画质,如图(分别对应( 12.png )):
13.png

最后,开始下载了:
14.png
但要注意,下载前命理窗口必须要足够大,且下载过程中别改变其大小,不然会出现排序混乱的情况。
(由于技术原因,上方命令窗口中显示的中文字符有些会缺失一半,如果有知道怎么回事的高手,还请告知一二,谢谢啦 (^_^) )


下方是源代码:
import requests
from lxml import etree
import curses
import time
from multiprocessing.pool import ThreadPool
import os
import demjson

requests.packages.urllib3.disable_warnings()


def getVideoName(home_url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
        'Referer': home_url,
    }

    re = requests.get(home_url, headers=headers)
    re.raise_for_status()
    re.encoding = 'utf-8'
    re_text = re.text

    tree = etree.HTML(re_text)
    name = tree.xpath('//div[@class="media-wrapper"]/h1/text()')
    title = tree.xpath('//meta[@name="keywords"]/@content')
    return name[0], title[0]


def getBliVideo(home_url, each_episode, each_episode_i, session = requests.session()):
    headers_info = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
        'Referer': home_url,
        'cookie': "*******************"      #注意,此处改成自己的cookie就行了
    }
    info_url = 'https://api.bilibili.com/pgc/player/web/playurl'
    # qn : 16, 32, 64, 80, 112
    params = {
        'qn': qn,
        'ep_id': each_episode,
    }

    video_info_re = session.get(url=info_url, headers=headers_info, params=params, verify=False)
    video_info_re.raise_for_status()

    video_info_dict = video_info_re.json()
    video_size = video_info_dict['result']['durl'][0]['size']
    # video_length = video_info_dict['result']['durl'][0]['length'] / 1000
    video_url = video_info_dict['result']['durl'][0]['backup_url'][0]
    video_name, video_title = getVideoName(home_url)

    video_title = os.path.join(os.getcwd(), video_title)
    if os.path.exists(video_title):
        pass
    else:
        os.mkdir(video_title)

    BliBliDownload(home_url=home_url, video_url=video_url, video_size=video_size, video_title=video_title, video_name=video_name, each_episode_i=each_episode_i)


def BliBliDownload(home_url, video_url, video_size, video_title, video_name, each_episode_i, session=requests.session()):
    def get_url(url, headers, timeout=5):
        try:
            return session.get(url=url, headers=headers, timeout=timeout, verify=False)
        except requests.exceptions.Timeout:
            return get_url(url, headers, timeout)

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
        'Referer': home_url,
    }
    # options_re = session.options(url=video_url, headers=headers)
    # print(options_re)
    # options_re.raise_for_status()

    # 进度条显示
    total=int(video_size/1024/1024); ncols=50; nrow=(each_episode_i+1)*2-1; desc='Processing'; unit='Mb'
    def get_bar(l, n, ncol):
        r = ncol - l - 1
        bar = ''.join(['#' for i in range(l)])
        if l != ncol:
            bar += f'{n}'
        bar += ''.join([' ' for i in range(r)])
        return '|'+bar+'|'

    stdscr.addstr(nrow, 0, f'正 在 下 载  -- {" ".join(video_name)}:')
    processing = f'{0:>3d}%'
    bar = get_bar(0, 0, ncols)
    mod = f'{desc} {each_episode_i+1:>2d}:  {bar} {processing:>4s}'
    stdscr.addstr(nrow+1, 1, mod)
    stdscr.refresh()
    
    start_time = time.time()

    with open(video_title+'/'+video_name + '_Video.mp4', 'wb') as f:
        # 每次下载 1M 数据
        begin = 0
        end = 1024 * 1024 - 1
        flag = 0
        for i in range(total):
            sub_start_time = time.time()

            # 内容处理
            headers.update({'Range': f'bytes={begin}-{end}'})
            re = get_url(url=video_url, headers=headers, timeout=5)
            if re.status_code != 416:
                begin = end + 1
                end = end + 1024 * 1024
            else:
                headers.update({'Range': f'bytes={end + 1}-'})
                re = get_url(url=video_url, headers=headers, timeout=5)
            f.write(re.content)
            f.flush()

            sub_stop_time = time.time()
            sub_delta_time = sub_stop_time - sub_start_time
            delta_time = sub_stop_time - start_time
            remain_time = (total - i - 1)*sub_delta_time
            
            temp = (i+1)/total
            processing = f'{int(temp*100):>3d}%'
            l = int(temp*ncols)
            n = int(temp*ncols*10)%10
            bar = get_bar(l, n, ncols)
            mod = f'{desc} {each_episode_i+1:>2d}:  {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}<{int(remain_time//60):0>2d}:{int(remain_time%60):0>2d}, {1/sub_delta_time:.2f}{unit}/s]'
            stdscr.addstr(nrow+1, 1, mod)
            stdscr.refresh()

    stdscr.addstr(nrow+1,0, f'{" ":>200s}')     # 清楚原行数据
    bar='|Download completed|'
    mod = f'{desc} {each_episode_i+1:>2d}:  {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}, {total/delta_time:.2f}{unit}/s]'
    stdscr.addstr(nrow+1, 1, mod)
    stdscr.refresh()

def main(ep_id_list):
    stdscr.addstr(0, 0, ' '.join('开始下载:'))
    stdscr.refresh()
    pool = ThreadPool(4)
    each_episode_i = 0
    for each_episode in ep_id_list:
        home_url = f'https://www.bilibili.com/bangumi/play/ep{each_episode}'
        pool.apply_async(getBliVideo, (home_url, each_episode, each_episode_i))
        each_episode_i += 1
        # print(getVideoName(home_url))
    pool.close()
    pool.join()
    stdscr.addstr(2*ep_id_iter_n+1, 0, ' '.join('所有视频已下载完成 (^_^)...'))
    stdscr.addstr(2*ep_id_iter_n+3, 0, 'Press any key to continue...')
    stdscr.refresh()


if __name__ == '__main__':
    requests.packages.urllib3.disable_warnings()

    ep_id = input('请输入视频epid(第一集), 或 epid列表[](不包括ep):')
    try:
        flag = 0
        if ',' in ep_id.strip():
            ep_id_list = demjson.decode(ep_id)
            ep_id_iter_n = len(ep_id_list)
            flag = 1
        else:
            ep_id_ns = input('请输入要下载的范围([从第几集,到第几集]), 如 [4,9]; 或 下载几集, 如 5; 或下载第几集, 如[5,]:')

            ep_id_ns = demjson.decode(ep_id_ns)
            if isinstance(ep_id_ns, int):
                ep_id_iter_start, ep_id_iter_n = int(ep_id), int(ep_id_ns)
                flag = 1
            elif isinstance(ep_id_ns, list):
                if len(ep_id_ns) == 2:
                    ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[0]) - 1, int(ep_id_ns[1]) - int(ep_id_ns[0]) + 1
                    flag = 1
                elif len(ep_id_ns) == 1:
                    ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[1]) - 1, 1
                    flag = 1
            ep_id_iter_end = ep_id_iter_start + ep_id_iter_n - 1
            ep_id_list = range(ep_id_iter_start, ep_id_iter_end+1)
    except:
        raise ValueError('选集有错误哦~~~')
    if flag == 0:
        raise ValueError('选集有错误哦~~~')

    qn = input('请输入下载视频画质(112,80,64,32,16):')

    def get_ch_and_continue(stdscr):
        stdscr.nodelay(0)
        ch = stdscr.getch()
        stdscr.nodelay(1)
    try:
        # 创建 curses
        stdscr = curses.initscr()
        curses.cbreak()
        curses.noecho()
        stdscr.keypad(1)

        main(ep_id_list)
        
        get_ch_and_continue(stdscr)
        # 关闭 curses
        curses.nocbreak()
        curses.echo()
        stdscr.keypad(0)
        curses.endwin()
    except Exception as e:
        curses.nocbreak()
        curses.echo()
        stdscr.keypad(0)
        curses.endwin()
        print(e)
其中,cookie在自己登陆账号后,从F12中,network,任意一个包中,查看headers,复制粘贴过来就行了。

本帖被以下淘专辑推荐:

想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-9-28 16:17

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表