|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
搜索是百度搜索,找到 爱奇艺、腾讯、优酷、PPTV、芒果 这些网站的链接
解析主要是通过在线解析网址,然后对解析后的网页分析,获得m3u8视频的url,解密下载
其中如果网页请求失败,我就不断的重新发起请求,所以下载最后阶段会有点慢
主要用的是'https://jx.618g.com/?url=',这个解析地址的电影视频都可以下载
但有时候会跳转到'https://jx.147g.cc/?url=',这个地址有robots协议就只能在线观看
Python版本 -> 3.8.7
废话不多说,直接上代码import os
import requests
import base64
from lxml import etree
from Crypto.Cipher import AES
import asyncio
import aiohttp
import shutil
import time
class FilmDownloader:
def __init__(self):
'''初始化'''
# 爱奇艺、腾讯、优酷、PPTV、芒果
'''查找电影的url'''
self.searchUrl = 'https://www.baidu.com/s?wd='
# 解析url
self.parseUrl = 'https://jx.618g.com/?url='
self.parseUrl_147 = 'https://jx.147g.cc/?url='
# 下载地址头
self.downloadHead = 'https://video.dious.cc'
# User-Agent
self.userAgent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36'
# 下载临时文件目录
self.downDir = os.getcwd() + '\\temp\\'
self.webList = ['爱奇艺', '腾讯视频', '优酷', 'PP视频', '芒果TV']
# 电影名称
self.name = ''
# 在线观看url
self.onlineUrl = ''
# 完整的电影播放网址
self.finalUrl = ''
# 搜索结果列表
self.searchResultList = []
# 电影所有的url的地址
self.allListUrl = ''
# 解密视频所需要的秘钥的URL
self.keyUrl = ''
# 解密视频的秘钥
self.key = ''
# 电影所有url列表
self.allList = []
# AES解密 - 初始化加密器
self.aes = AES.new(b'0000000000000000', AES.MODE_CBC)
# 临时文件总数
self.total = 0
# 已经下载的文件数量
self.cur = 0
self.indexUrl = ''
# 下载目录不存在,则创建目录
if not os.path.exists(self.downDir) or not os.path.isdir(self.downDir):
os.mkdir(self.downDir)
# 清空下载目录
if len(os.listdir(self.downDir)) != 0:
shutil.rmtree(self.downDir)
time.sleep(0.2)
os.mkdir(self.downDir)
def SearchFilm(self, name: str) -> bool:
'''电影搜索'''
if len(name) == 0:
print('电影名不能为空...')
return False
self.name = name
head = {
'Host': 'www.baidu.com',
'User-Agent': self.userAgent
}
print('正在搜索 {} 资源...'.format(name))
time.sleep(0.2)
res = requests.get(self.searchUrl + name, headers=head)
res.encoding = 'utf-8'
html = etree.HTML(res.text)
condition = '//a[@target="_blank"][@data-visited="off"][@class="dis-line-block c-gap-right dis-no-line c-blocka"]'
self.searchResultList = html.xpath(condition)
# 爱奇艺、腾讯、优酷、PPTV、芒果
for item in self.searchResultList:
flag = item.text == '爱奇艺' or item.text == '腾讯视频' or item.text == '优酷' or item.text == 'PP视频' or item.text == '芒果TV'
if flag:
self.finalUrl = item.attrib['href']
break
if len(self.finalUrl) == 0:
self.searchResultList = html.xpath('//div//h3//a')
tempList = []
isExist = False
for Item in self.searchResultList:
child = Item.getchildren()
for each in child:
tempList.append(each.tail)
tempList.append(each.text)
for each in self.webList:
if each in str(tempList):
isExist = True
break
if isExist:
self.finalUrl = Item.attrib['href']
break
if len(self.finalUrl) == 0:
return False
return True
def ParseFilmAndGetURL(self) -> bool:
'''解析视频,并获得下载地址'''
if len(self.finalUrl) == 0:
return False
head = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': self.userAgent
}
time.sleep(0.2)
res = requests.get(self.finalUrl, headers=head)
res.encoding = 'utf-8'
self.finalUrl = self.parseUrl + res.url
self.onlineUrl = self.finalUrl
return True
def getIndexUrl(self) -> bool:
'''获取视频所有片段的url'''
if len(self.finalUrl) == 0:
return False
head = {
'User-Agent': self.userAgent
}
time.sleep(0.2)
response = requests.get(self.finalUrl, headers=head)
response.encoding = 'utf-8'
html = etree.HTML(response.text)
nodes = html.xpath('//iframe[@id="player"]')
if nodes is None or len(nodes) == 0:
return False
for item in nodes:
self.indexUrl = item.attrib['src']
if self.indexUrl.find('m3u8') != -1:
index = self.indexUrl.find('url=') + 4
self.indexUrl = self.indexUrl[index:]
elif self.indexUrl.find('jx.147g.cc') != -1:
print('由于robots协议,本视频无法下载...')
return True
if len(self.indexUrl) != 0:
# print('url: ' + indexUrl)
break
time.sleep(0.2)
response = requests.get(self.indexUrl, headers=head)
response.encoding = 'urf-8'
result = response.text.split('\n')
for line in result:
if line.find('#') == -1:
self.allListUrl = self.downloadHead + line
break
try:
response = requests.get(self.allListUrl, headers=head)
except Exception:
print('{} 没有下载资源...'.format(self.name))
return False
response.encoding = 'utf-8'
tempList = response.text.split('\n')
n = 0
for line in tempList:
if line.find('KEY') != -1 and line.find('URI') != -1:
self.keyUrl = line[line.find('"') + 1:line.rfind('"')]
keyRes = requests.get(self.keyUrl, headers=head)
keyRes.encoding = 'utf-8'
self.key = keyRes.text
self.aes = AES.new(self.key.encode('utf-8'), AES.MODE_CBC)
elif line.find('http') != -1:
self.allList.append({
'index': n,
'url': line
})
n += 1
self.total = len(self.allList)
return True
async def crawler(self, index, url):
head = {
'Connection': 'keep-alive',
'Host': 'ts1.lslkkyj.com',
'User-Agent': self.userAgent
}
content = b''
try:
async with aiohttp.ClientSession() as session:
await asyncio.sleep(5)
async with session.get(url, headers=head) as response:
text = await response.read()
await asyncio.sleep(1)
content = self.aes.decrypt(text) # 解密
filename = self.downDir + '{:0>5d}.mp4'.format(index)
file = open(filename, 'wb')
file.write(content)
file.close()
self.cur += 1
percent = float(self.cur) / float(self.total) * 100
print('\r{} 下载中... {:.2f} % {:d} / {:d}'.format(self.name, percent, self.cur, self.total), end='')
except Exception:
# print('{:d} 下载错误;url:{}'.format(index, url))
await asyncio.sleep(5)
await self.crawler(index, url)
def DownloadFilm(self) -> bool:
'''协程'''
if len(self.allList) == 0:
return False
print('搜索完成!正在下载...')
loop = asyncio.get_event_loop()
tasks = [self.crawler(item['index'], item['url']) for item in self.allList]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print('\n下载完成,正在合并文件...')
command = 'copy /b ' + self.downDir + '* ' + os.getcwd() + '\\' + self.name + '.mp4'
os.system(command)
shutil.rmtree(self.downDir)
time.sleep(0.2)
os.mkdir(self.downDir)
print('视频下载完成...')
return True
def FindFilmAndDownload(self, name: str) -> bool:
'''查找视频并下载或在线观看'''
if not self.SearchFilm(name):
print('没有搜索到 {} 资源...'.format(name))
return False
elif not self.ParseFilmAndGetURL():
print('{} 资源解析失败...'.format(name))
return False
elif not self.getIndexUrl():
print('获取 {} 下载资源失败...'.format(name))
return False
chioce = input('是否在线观看?在线观看则不下载视频!y/n\n无法下载的视频不选择在线观看则退出程序...\n')
if chioce == 'y':
os.system('start ' + self.onlineUrl)
return True
if len(self.allList) == 0:
return False
if not self.DownloadFilm():
print('{} 下载失败...'.format(name))
if __name__ == "__main__":
print('-' * 50)
# film = '唐伯虎点秋香'
film = input('请输入电影名...\n')
task = FilmDownloader()
task.FindFilmAndDownload(film)
|
评分
-
查看全部评分
|