|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
单线程版本
- #单线程版本
- import requests,re,json
- from pathlib import Path
- from ffmpy3 import FFmpeg
- from tqdm import tqdm
- headers={"Cookie":"SESSDATA=这里填自己的",
- "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36 Edg/96.0.1054.41",
- "Referer":"https://www.bilibili.com/"}
- def get_video_info(video_url:str):
- """[通过正则表达式定位文本 得到视频信息]
- Args:
- video_url (str): [视频链接]
- Returns:
- [type]: [包含视频信息的字典]
- """
- video_html=requests.get(video_url,headers=headers)
- #<span class="cur-page">(1/12)</span>
- pages=re.findall(r'<span class="cur-page">\((.*?)\)</span>', video_html.text) #分页视频总数
- total_page=1
- if len(pages) !=0:
- cur_page=str(pages[0]).split('/')[0]
- total_page=int(str(pages[0]).split('/')[1])
- print(f'当前为多p视频总页数为{total_page},当前页数为{cur_page}')
- else:
- print('当前为单p视频')
- video_info={}
- for p in range(1,total_page+1):
- video_url=re.sub('p=.*', f'p={p}', video_url)
- video_html=requests.get(video_url,headers=headers)
- video_down_re=re.findall(r'<script>window\.__playinfo__=(.*?)</script>', video_html.text) #下载信息
- video_down_json=json.loads(video_down_re[0])
- video_base_re=re.findall(r'<script>window.__INITIAL_STATE__=(.*?);\(function\(\)', video_html.text) #基本信息
- video_base_json=json.loads(video_base_re[0])
- video_title=video_base_json['videoData']['pages'][p-1]['part']
- video_info[video_title]=dict(video_download_url=video_down_json['data']['dash']['video'][0]['baseUrl'],audio_download_url=video_down_json['data']['dash']['audio'][0]['baseUrl'])
- return video_info
- def video_download(video_title:str,video_download_url:str,audio_download_url:str):
- """[下载并合拼视频和音频]
- Args:
- video_title:str 视频标题
- video_download_url (str): [视频下载链接]
- audio_download_url (str): [音频下载链接]
- """
- res=requests.get(url=video_download_url,headers=headers,stream=True)
- chunk_size=1024 #每次下载大小 byte
- content_size=int(res.headers['Content-Length']) # 总大小 byte
-
- # pbar=tqdm(total=content_size,desc=f'{video_title}.mp4',ncols=100,unit='byte',unit_scale=True) #进度条
- # with open(f'{video_title}.mp4','wb') as stream:
- # for data in res.iter_content(chunk_size=chunk_size): #分块读取数据
- # stream.write(data)
- # pbar.update(len(data))
- # pbar.close()
- res=requests.get(url=audio_download_url,headers=headers)
- content_size=int(res.headers['Content-Length']) # 总大小 byte
- pbar=tqdm(total=content_size,desc=f'{video_title}.mp3',ncols=100,unit='byte',unit_scale=True) #进度条
- with open(f'{video_title}.mp3','wb') as stream:
- for data in res.iter_content(chunk_size=chunk_size): #分块读取数据
- stream.write(data)
- pbar.update(len(data))
- pbar.close()
- # #合并文件
- # ff = FFmpeg(inputs={f'{video_title}.mp4':None,f'{video_title}.mp3':None},outputs={f'{video_title}.mkv':'-codec copy'})
- # ff.run()
- # #删除文件
- # Path(f'{video_title}.mp3').unlink()
- # Path(f'{video_title}.mp4').unlink()
-
- if __name__=='__main__':
- url='https://www.bilibili.com/video/BV1f3411B7HK?p=1'
- video_info=get_video_info(video_url=url)
- import time
- start_time=time.time()
- video_titles=video_info.keys()
- for title in video_titles:
- video_download(title,video_info[title]['video_download_url'],video_info[title]['audio_download_url'])
-
- print(f'time={time.time()-start_time}')
-
复制代码
异步协程版本
- # 单线程异步协程版本
- import asyncio
- import re
- import aiohttp
- import aiofiles
- import json
- import requests
- from tqdm.asyncio import tqdm
- from pathlib import path
- from ffmpy3 import FFmpeg
- headers = {"Cookie": "SESSDATA=这里填自己的",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36 Edg/96.0.1054.41",
- "Referer": "https://www.bilibili.com/"}
- async def get_video_pages(video_url: str) -> int:
- '''
- 得到多p视频 总p数
- :param video_url:当前视频链接
- :return:总p数
- '''
- video_html = requests.get(url=video_url, headers=headers).text
- # <span class="cur-page">(1/12)</span>
- pages = re.findall(r'<span class="cur-page">\((.*?)\)</span>', video_html) # 分页视频总数
- total_page = 1
- if len(pages) != 0:
- cur_page = str(pages[0]).split('/')[0]
- total_page = int(str(pages[0]).split('/')[1])
- print(f'当前为多p视频总页数为{total_page},当前页数为{cur_page}')
- else:
- print('当前为单p视频')
- return total_page
- async def video_download(video_url: str, page: int):
- '''
- 下载视频,音频,合并视频使用FFmpeg
- :param video_url:当前链接
- :param page: 当前p数
- :return: 无
- '''
- async with aiohttp.ClientSession() as session:
- async with session.get(url=video_url, headers=headers) as res:
- video_html = await res.text()
- video_down_re = re.findall(r'<script>window\.__playinfo__=(.*?)</script>', video_html) # 下载信息
- video_down_json = json.loads(video_down_re[0])
- video_base_re = re.findall(r'<script>window.__INITIAL_STATE__=(.*?);\(function\(\)', video_html) # 基本信息
- video_base_json = json.loads(video_base_re[0])
- video_title = video_base_json['videoData']['pages'][page - 1]['part'] # 标题
- video_download_url = video_down_json['data']['dash']['video'][0]['baseUrl'] # 视频链接
- audio_download_url = video_down_json['data']['dash']['audio'][0]['baseUrl'] # 音频链接
- # async with session.get(url=video_download_url, headers=headers) as res_video:
- # video_byte=await res_video.read()
- async with session.get(url=audio_download_url, headers=headers) as res_audio:
- audio_byte = await res_audio.read()
- # async with aiofiles.open(f'{video_title}.mp4','wb') as stream:
- # await stream.write(video_byte)
- async with aiofiles.open(f'{video_title}.mp3', 'wb') as stream:
- await stream.write(audio_byte)
- # # 合并文件
- # ff = FFmpeg(inputs={f'{video_title}.mp4': None, f'{video_title}.mp3': None},outputs={f'{video_title}.mkv': '-codec copy'})
- # ff.run()
- # # 删除文件
- # Path(f'{video_title}.mp3').unlink()
- # Path(f'{video_title}.mp4').unlink()
- async def main():
- url = 'https://www.bilibili.com/video/BV1f3411B7HK?p=1'
- task_one = asyncio.create_task(get_video_pages(video_url=url))
- toatl_pages = await task_one
- tasks=[]
- for p in range(1, toatl_pages + 1):
- video_url = re.sub('p=.*', f'p={p}', url)
- tasks.append(video_download(video_url,p))
- import time
- start = time.time()
- with tqdm(asyncio.as_completed(tasks),total=len(tasks),unit='byte',unit_scale=True) as pbar:
- for j in pbar:
- await j
- print(f'time={time.time() - start}')
- if __name__ == '__main__':
- loop=asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(main())
复制代码 |
|