|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 兰竹皋 于 2020-11-19 21:49 编辑
现在b站番剧版权要求越来越严,越来越多的番剧不能直接下载了。
而我个人对于一些喜欢的番剧有收藏爱好,于是就写了这个小爬虫——通过番剧的ep号下载b站番剧(对于VIP账户而言)。
当然,为了方便而言我也就没有打包成GUI了。
下面展示一些爬取过程:
首先获取番剧的ep号,如图:
通常我们点入番剧后得到的并不是ep号,
此时只需要点击一下右方的任意一其他集,如图:
页面上方地址便出现了ep号。
当然,还有一些特殊番剧,只有一集,如:
无法通过上述方法获得,只能通过抓包(playurl开头的包),如图:
而同样,通常而言,一般番剧的ep号是连续数字,如图:
此时,执行程序时,只需要输入:
或
但同样,也总存在这样一些莫名其妙的番剧,如图:
番号并不连续。
此时,执行程序时,输入:
即可。
接着,输入下载画质,如图(分别对应(
)):
最后,开始下载了:
但要注意,下载前命理窗口必须要足够大,且下载过程中别改变其大小,不然会出现排序混乱的情况。
(由于技术原因,上方命令窗口中显示的中文字符有些会缺失一半,如果有知道怎么回事的高手,还请告知一二,谢谢啦 (^_^) )
下方是源代码:
- import requests
- from lxml import etree
- import curses
- import time
- from multiprocessing.pool import ThreadPool
- import os
- import demjson
- requests.packages.urllib3.disable_warnings()
- def getVideoName(home_url):
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
- 'Referer': home_url,
- }
- re = requests.get(home_url, headers=headers)
- re.raise_for_status()
- re.encoding = 'utf-8'
- re_text = re.text
- tree = etree.HTML(re_text)
- name = tree.xpath('//div[@class="media-wrapper"]/h1/text()')
- title = tree.xpath('//meta[@name="keywords"]/@content')
- return name[0], title[0]
- def getBliVideo(home_url, each_episode, each_episode_i, session = requests.session()):
- headers_info = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
- 'Referer': home_url,
- 'cookie': "*******************" #注意,此处改成自己的cookie就行了
- }
- info_url = 'https://api.bilibili.com/pgc/player/web/playurl'
- # qn : 16, 32, 64, 80, 112
- params = {
- 'qn': qn,
- 'ep_id': each_episode,
- }
- video_info_re = session.get(url=info_url, headers=headers_info, params=params, verify=False)
- video_info_re.raise_for_status()
- video_info_dict = video_info_re.json()
- video_size = video_info_dict['result']['durl'][0]['size']
- # video_length = video_info_dict['result']['durl'][0]['length'] / 1000
- video_url = video_info_dict['result']['durl'][0]['backup_url'][0]
- video_name, video_title = getVideoName(home_url)
- video_title = os.path.join(os.getcwd(), video_title)
- if os.path.exists(video_title):
- pass
- else:
- os.mkdir(video_title)
- BliBliDownload(home_url=home_url, video_url=video_url, video_size=video_size, video_title=video_title, video_name=video_name, each_episode_i=each_episode_i)
- def BliBliDownload(home_url, video_url, video_size, video_title, video_name, each_episode_i, session=requests.session()):
- def get_url(url, headers, timeout=5):
- try:
- return session.get(url=url, headers=headers, timeout=timeout, verify=False)
- except requests.exceptions.Timeout:
- return get_url(url, headers, timeout)
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
- 'Referer': home_url,
- }
- # options_re = session.options(url=video_url, headers=headers)
- # print(options_re)
- # options_re.raise_for_status()
- # 进度条显示
- total=int(video_size/1024/1024); ncols=50; nrow=(each_episode_i+1)*2-1; desc='Processing'; unit='Mb'
- def get_bar(l, n, ncol):
- r = ncol - l - 1
- bar = ''.join(['#' for i in range(l)])
- if l != ncol:
- bar += f'{n}'
- bar += ''.join([' ' for i in range(r)])
- return '|'+bar+'|'
- stdscr.addstr(nrow, 0, f'正 在 下 载 -- {" ".join(video_name)}:')
- processing = f'{0:>3d}%'
- bar = get_bar(0, 0, ncols)
- mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s}'
- stdscr.addstr(nrow+1, 1, mod)
- stdscr.refresh()
-
- start_time = time.time()
- with open(video_title+'/'+video_name + '_Video.mp4', 'wb') as f:
- # 每次下载 1M 数据
- begin = 0
- end = 1024 * 1024 - 1
- flag = 0
- for i in range(total):
- sub_start_time = time.time()
- # 内容处理
- headers.update({'Range': f'bytes={begin}-{end}'})
- re = get_url(url=video_url, headers=headers, timeout=5)
- if re.status_code != 416:
- begin = end + 1
- end = end + 1024 * 1024
- else:
- headers.update({'Range': f'bytes={end + 1}-'})
- re = get_url(url=video_url, headers=headers, timeout=5)
- f.write(re.content)
- f.flush()
- sub_stop_time = time.time()
- sub_delta_time = sub_stop_time - sub_start_time
- delta_time = sub_stop_time - start_time
- remain_time = (total - i - 1)*sub_delta_time
-
- temp = (i+1)/total
- processing = f'{int(temp*100):>3d}%'
- l = int(temp*ncols)
- n = int(temp*ncols*10)%10
- bar = get_bar(l, n, ncols)
- mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}<{int(remain_time//60):0>2d}:{int(remain_time%60):0>2d}, {1/sub_delta_time:.2f}{unit}/s]'
- stdscr.addstr(nrow+1, 1, mod)
- stdscr.refresh()
- stdscr.addstr(nrow+1,0, f'{" ":>200s}') # 清楚原行数据
- bar='|Download completed|'
- mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}, {total/delta_time:.2f}{unit}/s]'
- stdscr.addstr(nrow+1, 1, mod)
- stdscr.refresh()
- def main(ep_id_list):
- stdscr.addstr(0, 0, ' '.join('开始下载:'))
- stdscr.refresh()
- pool = ThreadPool(4)
- each_episode_i = 0
- for each_episode in ep_id_list:
- home_url = f'https://www.bilibili.com/bangumi/play/ep{each_episode}'
- pool.apply_async(getBliVideo, (home_url, each_episode, each_episode_i))
- each_episode_i += 1
- # print(getVideoName(home_url))
- pool.close()
- pool.join()
- stdscr.addstr(2*ep_id_iter_n+1, 0, ' '.join('所有视频已下载完成 (^_^)...'))
- stdscr.addstr(2*ep_id_iter_n+3, 0, 'Press any key to continue...')
- stdscr.refresh()
- if __name__ == '__main__':
- requests.packages.urllib3.disable_warnings()
- ep_id = input('请输入视频epid(第一集), 或 epid列表[](不包括ep):')
- try:
- flag = 0
- if ',' in ep_id.strip():
- ep_id_list = demjson.decode(ep_id)
- ep_id_iter_n = len(ep_id_list)
- flag = 1
- else:
- ep_id_ns = input('请输入要下载的范围([从第几集,到第几集]), 如 [4,9]; 或 下载几集, 如 5; 或下载第几集, 如[5,]:')
- ep_id_ns = demjson.decode(ep_id_ns)
- if isinstance(ep_id_ns, int):
- ep_id_iter_start, ep_id_iter_n = int(ep_id), int(ep_id_ns)
- flag = 1
- elif isinstance(ep_id_ns, list):
- if len(ep_id_ns) == 2:
- ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[0]) - 1, int(ep_id_ns[1]) - int(ep_id_ns[0]) + 1
- flag = 1
- elif len(ep_id_ns) == 1:
- ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[1]) - 1, 1
- flag = 1
- ep_id_iter_end = ep_id_iter_start + ep_id_iter_n - 1
- ep_id_list = range(ep_id_iter_start, ep_id_iter_end+1)
- except:
- raise ValueError('选集有错误哦~~~')
- if flag == 0:
- raise ValueError('选集有错误哦~~~')
- qn = input('请输入下载视频画质(112,80,64,32,16):')
- def get_ch_and_continue(stdscr):
- stdscr.nodelay(0)
- ch = stdscr.getch()
- stdscr.nodelay(1)
- try:
- # 创建 curses
- stdscr = curses.initscr()
- curses.cbreak()
- curses.noecho()
- stdscr.keypad(1)
- main(ep_id_list)
-
- get_ch_and_continue(stdscr)
- # 关闭 curses
- curses.nocbreak()
- curses.echo()
- stdscr.keypad(0)
- curses.endwin()
- except Exception as e:
- curses.nocbreak()
- curses.echo()
- stdscr.keypad(0)
- curses.endwin()
- print(e)
复制代码
其中,cookie在自己登陆账号后,从F12中,network,任意一个包中,查看headers,复制粘贴过来就行了。
|
|