import requests
from lxml import etree
import curses
import time
from multiprocessing.pool import ThreadPool
import os
import demjson
requests.packages.urllib3.disable_warnings()
def getVideoName(home_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
'Referer': home_url,
}
re = requests.get(home_url, headers=headers)
re.raise_for_status()
re.encoding = 'utf-8'
re_text = re.text
tree = etree.HTML(re_text)
name = tree.xpath('//div[@class="media-wrapper"]/h1/text()')
title = tree.xpath('//meta[@name="keywords"]/@content')
return name[0], title[0]
def getBliVideo(home_url, each_episode, each_episode_i, session = requests.session()):
headers_info = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
'Referer': home_url,
'cookie': "*******************" #注意,此处改成自己的cookie就行了
}
info_url = 'https://api.bilibili.com/pgc/player/web/playurl'
# qn : 16, 32, 64, 80, 112
params = {
'qn': qn,
'ep_id': each_episode,
}
video_info_re = session.get(url=info_url, headers=headers_info, params=params, verify=False)
video_info_re.raise_for_status()
video_info_dict = video_info_re.json()
video_size = video_info_dict['result']['durl'][0]['size']
# video_length = video_info_dict['result']['durl'][0]['length'] / 1000
video_url = video_info_dict['result']['durl'][0]['backup_url'][0]
video_name, video_title = getVideoName(home_url)
video_title = os.path.join(os.getcwd(), video_title)
if os.path.exists(video_title):
pass
else:
os.mkdir(video_title)
BliBliDownload(home_url=home_url, video_url=video_url, video_size=video_size, video_title=video_title, video_name=video_name, each_episode_i=each_episode_i)
def BliBliDownload(home_url, video_url, video_size, video_title, video_name, each_episode_i, session=requests.session()):
def get_url(url, headers, timeout=5):
try:
return session.get(url=url, headers=headers, timeout=timeout, verify=False)
except requests.exceptions.Timeout:
return get_url(url, headers, timeout)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3970.5 Safari/537.36',
'Referer': home_url,
}
# options_re = session.options(url=video_url, headers=headers)
# print(options_re)
# options_re.raise_for_status()
# 进度条显示
total=int(video_size/1024/1024); ncols=50; nrow=(each_episode_i+1)*2-1; desc='Processing'; unit='Mb'
def get_bar(l, n, ncol):
r = ncol - l - 1
bar = ''.join(['#' for i in range(l)])
if l != ncol:
bar += f'{n}'
bar += ''.join([' ' for i in range(r)])
return '|'+bar+'|'
stdscr.addstr(nrow, 0, f'正 在 下 载 -- {" ".join(video_name)}:')
processing = f'{0:>3d}%'
bar = get_bar(0, 0, ncols)
mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s}'
stdscr.addstr(nrow+1, 1, mod)
stdscr.refresh()
start_time = time.time()
with open(video_title+'/'+video_name + '_Video.mp4', 'wb') as f:
# 每次下载 1M 数据
begin = 0
end = 1024 * 1024 - 1
flag = 0
for i in range(total):
sub_start_time = time.time()
# 内容处理
headers.update({'Range': f'bytes={begin}-{end}'})
re = get_url(url=video_url, headers=headers, timeout=5)
if re.status_code != 416:
begin = end + 1
end = end + 1024 * 1024
else:
headers.update({'Range': f'bytes={end + 1}-'})
re = get_url(url=video_url, headers=headers, timeout=5)
f.write(re.content)
f.flush()
sub_stop_time = time.time()
sub_delta_time = sub_stop_time - sub_start_time
delta_time = sub_stop_time - start_time
remain_time = (total - i - 1)*sub_delta_time
temp = (i+1)/total
processing = f'{int(temp*100):>3d}%'
l = int(temp*ncols)
n = int(temp*ncols*10)%10
bar = get_bar(l, n, ncols)
mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}<{int(remain_time//60):0>2d}:{int(remain_time%60):0>2d}, {1/sub_delta_time:.2f}{unit}/s]'
stdscr.addstr(nrow+1, 1, mod)
stdscr.refresh()
stdscr.addstr(nrow+1,0, f'{" ":>200s}') # 清楚原行数据
bar='|Download completed|'
mod = f'{desc} {each_episode_i+1:>2d}: {bar} {processing:>4s} ({i+1} of {total}) [{int(delta_time//60):0>2d}:{int(delta_time%60):0>2d}, {total/delta_time:.2f}{unit}/s]'
stdscr.addstr(nrow+1, 1, mod)
stdscr.refresh()
def main(ep_id_list):
stdscr.addstr(0, 0, ' '.join('开始下载:'))
stdscr.refresh()
pool = ThreadPool(4)
each_episode_i = 0
for each_episode in ep_id_list:
home_url = f'https://www.bilibili.com/bangumi/play/ep{each_episode}'
pool.apply_async(getBliVideo, (home_url, each_episode, each_episode_i))
each_episode_i += 1
# print(getVideoName(home_url))
pool.close()
pool.join()
stdscr.addstr(2*ep_id_iter_n+1, 0, ' '.join('所有视频已下载完成 (^_^)...'))
stdscr.addstr(2*ep_id_iter_n+3, 0, 'Press any key to continue...')
stdscr.refresh()
if __name__ == '__main__':
requests.packages.urllib3.disable_warnings()
ep_id = input('请输入视频epid(第一集), 或 epid列表[](不包括ep):')
try:
flag = 0
if ',' in ep_id.strip():
ep_id_list = demjson.decode(ep_id)
ep_id_iter_n = len(ep_id_list)
flag = 1
else:
ep_id_ns = input('请输入要下载的范围([从第几集,到第几集]), 如 [4,9]; 或 下载几集, 如 5; 或下载第几集, 如[5,]:')
ep_id_ns = demjson.decode(ep_id_ns)
if isinstance(ep_id_ns, int):
ep_id_iter_start, ep_id_iter_n = int(ep_id), int(ep_id_ns)
flag = 1
elif isinstance(ep_id_ns, list):
if len(ep_id_ns) == 2:
ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[0]) - 1, int(ep_id_ns[1]) - int(ep_id_ns[0]) + 1
flag = 1
elif len(ep_id_ns) == 1:
ep_id_iter_start, ep_id_iter_n = int(ep_id) + int(ep_id_ns[1]) - 1, 1
flag = 1
ep_id_iter_end = ep_id_iter_start + ep_id_iter_n - 1
ep_id_list = range(ep_id_iter_start, ep_id_iter_end+1)
except:
raise ValueError('选集有错误哦~~~')
if flag == 0:
raise ValueError('选集有错误哦~~~')
qn = input('请输入下载视频画质(112,80,64,32,16):')
def get_ch_and_continue(stdscr):
stdscr.nodelay(0)
ch = stdscr.getch()
stdscr.nodelay(1)
try:
# 创建 curses
stdscr = curses.initscr()
curses.cbreak()
curses.noecho()
stdscr.keypad(1)
main(ep_id_list)
get_ch_and_continue(stdscr)
# 关闭 curses
curses.nocbreak()
curses.echo()
stdscr.keypad(0)
curses.endwin()
except Exception as e:
curses.nocbreak()
curses.echo()
stdscr.keypad(0)
curses.endwin()
print(e)
其中,cookie在自己登陆账号后,从F12中,network,任意一个包中,查看headers,复制粘贴过来就行了。