兰竹皋 发表于 2020-11-19 21:47:42

B站,番剧下载爬虫(VIP版)

本帖最后由 兰竹皋 于 2020-11-19 21:49 编辑


现在b站番剧版权要求越来越严,越来越多的番剧不能直接下载了。
而我个人对于一些喜欢的番剧有收藏爱好,于是就写了这个小爬虫——通过番剧的ep号下载b站番剧(对于VIP账户而言)。
当然,为了方便而言我也就没有打包成GUI了。
下面展示一些爬取过程:
首先获取番剧的ep号,如图:


通常我们点入番剧后得到的并不是ep号,
此时只需要点击一下右方的任意一其他集,如图:

页面上方地址便出现了ep号。
当然,还有一些特殊番剧,只有一集,如:

无法通过上述方法获得,只能通过抓包(playurl开头的包),如图:



而同样,通常而言,一般番剧的ep号是连续数字,如图:

此时,执行程序时,只需要输入:




但同样,也总存在这样一些莫名其妙的番剧,如图:


番号并不连续。
此时,执行程序时,输入:

即可。

接着,输入下载画质,如图(分别对应()):


最后,开始下载了:

但要注意,下载前命理窗口必须要足够大,且下载过程中别改变其大小,不然会出现排序混乱的情况。
(由于技术原因,上方命令窗口中显示的中文字符有些会缺失一半,如果有知道怎么回事的高手,还请告知一二,谢谢啦 (^_^) )


下方是源代码:
import requests
from lxml import etree
import curses
import time
from multiprocessing.pool import ThreadPool
import os
import demjson

requests.packages.urllib3.disable_warnings()


def getVideoName(home_url):
    headers = {
      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
      'Referer': home_url,
    }

    re = requests.get(home_url, headers=headers)
    re.raise_for_status()
    re.encoding = 'utf-8'
    re_text = re.text

    tree = etree.HTML(re_text)
    name = tree.xpath('//div[@class="media-wrapper"]/h1/text()')
    title = tree.xpath('//meta[@name="keywords"]/@content')
    return name, title


def getBliVideo(home_url, each_episode, each_episode_i, session = requests.session()):
    headers_info = {
      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
      'Referer': home_url,
      'cookie': "*******************"      #注意,此处改成自己的cookie就行了
    }
    info_url = 'https://api.bilibili.com/pgc/player/web/playurl'
    # qn : 16, 32, 64, 80, 112
    params = {
      'qn': qn,
      'ep_id': each_episode,
    }

    video_info_re = session.get(url=info_url, headers=headers_info, params=params, verify=False)
    video_info_re.raise_for_status()

    video_info_dict = video_info_re.json()
    video_size = video_info_dict['result']['durl']['size']
    # video_length = video_info_dict['result']['durl']['length'] / 1000
    video_url = video_info_dict['result']['durl']['backup_url']
    video_name, video_title = getVideoName(home_url)

    video_title = os.path.join(os.getcwd(), video_title)
    if os.path.exists(video_title):
      pass
    else:
      os.mkdir(video_title)

    BliBliDownload(home_url=home_url, video_url=video_url, video_size=video_size, video_title=video_title, video_name=video_name, each_episode_i=each_episode_i)


def BliBliDownload(home_url, video_url, video_size, video_title, video_name, each_episode_i, session=requests.session()):
    def get_url(url, headers, timeout=5):
      try:
            return session.get(url=url, headers=headers, timeout=timeout, verify=False)
      except requests.exceptions.Timeout:
            return get_url(url, headers, timeout)

    headers = {
      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
      'Referer': home_url,
    }
    # options_re = session.options(url=video_url, headers=headers)
    # print(options_re)
    # options_re.raise_for_status()

    # 进度条显示
    total=int(video_size/1024/1024); ncols=50; nrow=(each_episode_i+1)*2-1; desc='Processing'; unit='Mb'
    def get_bar(l, n, ncol):
      r = ncol - l - 1
      bar = ''.join(['#' for i in range(l)])
      if l != ncol:
            bar += f'{n}'
      bar += ''.join([' ' for i in range(r)])
      return '|'+bar+'|'

    stdscr.addstr(nrow, 0, f'正 在 下 载-- {" ".join(video_name)}:')
    processing = f'{0:>3d}%'
    bar = get_bar(0, 0, ncols)
    mod = f'{desc} {each_episode_i+1:>2d}:{bar} {processing:>4s}'
    stdscr.addstr(nrow+1, 1, mod)
    stdscr.refresh()
   
    start_time = time.time()

    with open(video_title+'/'+video_name + '_Video.mp4', 'wb') as f:
      # 每次下载 1M 数据
      begin = 0
      end = 1024 * 1024 - 1
      flag = 0
      for i in range(total):
            sub_start_time = time.time()

            # 内容处理
            headers.update({'Range': f'bytes={begin}-{end}'})
            re = get_url(url=video_url, headers=headers, timeout=5)
            if re.status_code != 416:
                begin = end + 1
                end = end + 1024 * 1024
            else:
                headers.update({'Range': f'bytes={end + 1}-'})
                re = get_url(url=video_url, headers=headers, timeout=5)
            f.write(re.content)
            f.flush()

            sub_stop_time = time.time()
            sub_delta_time = sub_stop_time - sub_start_time
            delta_time = sub_stop_time - start_time
            remain_time = (total - i - 1)*sub_delta_time
            
            temp = (i+1)/total
            processing = f'{int(temp*100):>3d}%'
            l = int(temp*ncols)
            n = int(temp*ncols*10)%10
            bar = get_bar(l, n, ncols)
            mod = f'{desc} {each_episode_i+1:>2d}:{bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}<{int(remain_time//60):0>2d}:{int(remain_time%60):0>2d}, {1/sub_delta_time:.2f}{unit}/s]'
            stdscr.addstr(nrow+1, 1, mod)
            stdscr.refresh()

    stdscr.addstr(nrow+1,0, f'{" ":>200s}')   # 清楚原行数据
    bar='|Download completed|'
    mod = f'{desc} {each_episode_i+1:>2d}:{bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}, {total/delta_time:.2f}{unit}/s]'
    stdscr.addstr(nrow+1, 1, mod)
    stdscr.refresh()

def main(ep_id_list):
    stdscr.addstr(0, 0, ' '.join('开始下载:'))
    stdscr.refresh()
    pool = ThreadPool(4)
    each_episode_i = 0
    for each_episode in ep_id_list:
      home_url = f'https://www.bilibili.com/bangumi/play/ep{each_episode}'
      pool.apply_async(getBliVideo, (home_url, each_episode, each_episode_i))
      each_episode_i += 1
      # print(getVideoName(home_url))
    pool.close()
    pool.join()
    stdscr.addstr(2*ep_id_iter_n+1, 0, ' '.join('所有视频已下载完成 (^_^)...'))
    stdscr.addstr(2*ep_id_iter_n+3, 0, 'Press any key to continue...')
    stdscr.refresh()


if __name__ == '__main__':
    requests.packages.urllib3.disable_warnings()

    ep_id = input('请输入视频epid(第一集), 或 epid列表[](不包括ep):')
    try:
      flag = 0
      if ',' in ep_id.strip():
            ep_id_list = demjson.decode(ep_id)
            ep_id_iter_n = len(ep_id_list)
            flag = 1
      else:
            ep_id_ns = input('请输入要下载的范围([从第几集,到第几集]), 如 ; 或 下载几集, 如 5; 或下载第几集, 如:')

            ep_id_ns = demjson.decode(ep_id_ns)
            if isinstance(ep_id_ns, int):
                ep_id_iter_start, ep_id_iter_n = int(ep_id), int(ep_id_ns)
                flag = 1
            elif isinstance(ep_id_ns, list):
                if len(ep_id_ns) == 2:
                  ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns) - 1, int(ep_id_ns) - int(ep_id_ns) + 1
                  flag = 1
                elif len(ep_id_ns) == 1:
                  ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns) - 1, 1
                  flag = 1
            ep_id_iter_end = ep_id_iter_start + ep_id_iter_n - 1
            ep_id_list = range(ep_id_iter_start, ep_id_iter_end+1)
    except:
      raise ValueError('选集有错误哦~~~')
    if flag == 0:
      raise ValueError('选集有错误哦~~~')

    qn = input('请输入下载视频画质(112,80,64,32,16):')

    def get_ch_and_continue(stdscr):
      stdscr.nodelay(0)
      ch = stdscr.getch()
      stdscr.nodelay(1)
    try:
      # 创建 curses
      stdscr = curses.initscr()
      curses.cbreak()
      curses.noecho()
      stdscr.keypad(1)

      main(ep_id_list)
      
      get_ch_and_continue(stdscr)
      # 关闭 curses
      curses.nocbreak()
      curses.echo()
      stdscr.keypad(0)
      curses.endwin()
    except Exception as e:
      curses.nocbreak()
      curses.echo()
      stdscr.keypad(0)
      curses.endwin()
      print(e)

其中,cookie在自己登陆账号后,从F12中,network,任意一个包中,查看headers,复制粘贴过来就行了。
页: [1]
查看完整版本: B站,番剧下载爬虫(VIP版)