|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
搜索是百度搜索,找到 爱奇艺、腾讯、优酷、PPTV、芒果 这些网站的链接
解析主要是通过在线解析网址,然后对解析后的网页分析,获得m3u8视频的url,解密下载
其中如果网页请求失败,我就不断的重新发起请求,所以下载最后阶段会有点慢
主要用的是'https://jx.618g.com/?url=',这个解析地址的电影视频都可以下载
但有时候会跳转到'https://jx.147g.cc/?url=',这个地址有robots协议就只能在线观看
Python版本 -> 3.8.7
废话不多说,直接上代码
- import os
- import requests
- import base64
- from lxml import etree
- from Crypto.Cipher import AES
- import asyncio
- import aiohttp
- import shutil
- import time
- class FilmDownloader:
- def __init__(self):
- '''初始化'''
-
- # 爱奇艺、腾讯、优酷、PPTV、芒果
- '''查找电影的url'''
- self.searchUrl = 'https://www.baidu.com/s?wd='
- # 解析url
- self.parseUrl = 'https://jx.618g.com/?url='
- self.parseUrl_147 = 'https://jx.147g.cc/?url='
- # 下载地址头
- self.downloadHead = 'https://video.dious.cc'
- # User-Agent
- self.userAgent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36'
- # 下载临时文件目录
- self.downDir = os.getcwd() + '\\temp\\'
- self.webList = ['爱奇艺', '腾讯视频', '优酷', 'PP视频', '芒果TV']
- # 电影名称
- self.name = ''
- # 在线观看url
- self.onlineUrl = ''
- # 完整的电影播放网址
- self.finalUrl = ''
- # 搜索结果列表
- self.searchResultList = []
- # 电影所有的url的地址
- self.allListUrl = ''
- # 解密视频所需要的秘钥的URL
- self.keyUrl = ''
- # 解密视频的秘钥
- self.key = ''
- # 电影所有url列表
- self.allList = []
- # AES解密 - 初始化加密器
- self.aes = AES.new(b'0000000000000000', AES.MODE_CBC)
- # 临时文件总数
- self.total = 0
- # 已经下载的文件数量
- self.cur = 0
- self.indexUrl = ''
- # 下载目录不存在,则创建目录
- if not os.path.exists(self.downDir) or not os.path.isdir(self.downDir):
- os.mkdir(self.downDir)
- # 清空下载目录
- if len(os.listdir(self.downDir)) != 0:
- shutil.rmtree(self.downDir)
- time.sleep(0.2)
- os.mkdir(self.downDir)
- def SearchFilm(self, name: str) -> bool:
- '''电影搜索'''
- if len(name) == 0:
- print('电影名不能为空...')
- return False
- self.name = name
- head = {
- 'Host': 'www.baidu.com',
- 'User-Agent': self.userAgent
- }
- print('正在搜索 {} 资源...'.format(name))
- time.sleep(0.2)
- res = requests.get(self.searchUrl + name, headers=head)
- res.encoding = 'utf-8'
- html = etree.HTML(res.text)
- condition = '//a[@target="_blank"][@data-visited="off"][@class="dis-line-block c-gap-right dis-no-line c-blocka"]'
- self.searchResultList = html.xpath(condition)
- # 爱奇艺、腾讯、优酷、PPTV、芒果
- for item in self.searchResultList:
- flag = item.text == '爱奇艺' or item.text == '腾讯视频' or item.text == '优酷' or item.text == 'PP视频' or item.text == '芒果TV'
- if flag:
- self.finalUrl = item.attrib['href']
- break
- if len(self.finalUrl) == 0:
- self.searchResultList = html.xpath('//div//h3//a')
- tempList = []
- isExist = False
- for Item in self.searchResultList:
- child = Item.getchildren()
- for each in child:
- tempList.append(each.tail)
- tempList.append(each.text)
- for each in self.webList:
- if each in str(tempList):
- isExist = True
- break
- if isExist:
- self.finalUrl = Item.attrib['href']
- break
-
- if len(self.finalUrl) == 0:
- return False
- return True
- def ParseFilmAndGetURL(self) -> bool:
- '''解析视频,并获得下载地址'''
- if len(self.finalUrl) == 0:
- return False
- head = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
- 'User-Agent': self.userAgent
- }
- time.sleep(0.2)
- res = requests.get(self.finalUrl, headers=head)
- res.encoding = 'utf-8'
- self.finalUrl = self.parseUrl + res.url
- self.onlineUrl = self.finalUrl
- return True
- def getIndexUrl(self) -> bool:
- '''获取视频所有片段的url'''
- if len(self.finalUrl) == 0:
- return False
- head = {
- 'User-Agent': self.userAgent
- }
- time.sleep(0.2)
- response = requests.get(self.finalUrl, headers=head)
- response.encoding = 'utf-8'
- html = etree.HTML(response.text)
- nodes = html.xpath('//iframe[@id="player"]')
- if nodes is None or len(nodes) == 0:
- return False
- for item in nodes:
- self.indexUrl = item.attrib['src']
- if self.indexUrl.find('m3u8') != -1:
- index = self.indexUrl.find('url=') + 4
- self.indexUrl = self.indexUrl[index:]
- elif self.indexUrl.find('jx.147g.cc') != -1:
- print('由于robots协议,本视频无法下载...')
- return True
- if len(self.indexUrl) != 0:
- # print('url: ' + indexUrl)
- break
- time.sleep(0.2)
- response = requests.get(self.indexUrl, headers=head)
- response.encoding = 'urf-8'
-
- result = response.text.split('\n')
- for line in result:
- if line.find('#') == -1:
- self.allListUrl = self.downloadHead + line
- break
- try:
- response = requests.get(self.allListUrl, headers=head)
- except Exception:
- print('{} 没有下载资源...'.format(self.name))
- return False
-
- response.encoding = 'utf-8'
- tempList = response.text.split('\n')
- n = 0
- for line in tempList:
- if line.find('KEY') != -1 and line.find('URI') != -1:
- self.keyUrl = line[line.find('"') + 1:line.rfind('"')]
- keyRes = requests.get(self.keyUrl, headers=head)
- keyRes.encoding = 'utf-8'
- self.key = keyRes.text
- self.aes = AES.new(self.key.encode('utf-8'), AES.MODE_CBC)
- elif line.find('http') != -1:
- self.allList.append({
- 'index': n,
- 'url': line
- })
- n += 1
-
- self.total = len(self.allList)
- return True
- async def crawler(self, index, url):
- head = {
- 'Connection': 'keep-alive',
- 'Host': 'ts1.lslkkyj.com',
- 'User-Agent': self.userAgent
- }
- content = b''
-
- try:
- async with aiohttp.ClientSession() as session:
- await asyncio.sleep(5)
- async with session.get(url, headers=head) as response:
- text = await response.read()
- await asyncio.sleep(1)
-
- content = self.aes.decrypt(text) # 解密
- filename = self.downDir + '{:0>5d}.mp4'.format(index)
- file = open(filename, 'wb')
- file.write(content)
- file.close()
- self.cur += 1
- percent = float(self.cur) / float(self.total) * 100
- print('\r{} 下载中... {:.2f} % {:d} / {:d}'.format(self.name, percent, self.cur, self.total), end='')
- except Exception:
- # print('{:d} 下载错误;url:{}'.format(index, url))
- await asyncio.sleep(5)
- await self.crawler(index, url)
- def DownloadFilm(self) -> bool:
- '''协程'''
- if len(self.allList) == 0:
- return False
- print('搜索完成!正在下载...')
- loop = asyncio.get_event_loop()
- tasks = [self.crawler(item['index'], item['url']) for item in self.allList]
- loop.run_until_complete(asyncio.gather(*tasks))
- loop.close()
- print('\n下载完成,正在合并文件...')
- command = 'copy /b ' + self.downDir + '* ' + os.getcwd() + '\\' + self.name + '.mp4'
- os.system(command)
- shutil.rmtree(self.downDir)
- time.sleep(0.2)
- os.mkdir(self.downDir)
- print('视频下载完成...')
- return True
- def FindFilmAndDownload(self, name: str) -> bool:
- '''查找视频并下载或在线观看'''
- if not self.SearchFilm(name):
- print('没有搜索到 {} 资源...'.format(name))
- return False
- elif not self.ParseFilmAndGetURL():
- print('{} 资源解析失败...'.format(name))
- return False
- elif not self.getIndexUrl():
- print('获取 {} 下载资源失败...'.format(name))
- return False
- chioce = input('是否在线观看?在线观看则不下载视频!y/n\n无法下载的视频不选择在线观看则退出程序...\n')
- if chioce == 'y':
- os.system('start ' + self.onlineUrl)
- return True
- if len(self.allList) == 0:
- return False
- if not self.DownloadFilm():
- print('{} 下载失败...'.format(name))
-
- if __name__ == "__main__":
-
- print('-' * 50)
- # film = '唐伯虎点秋香'
- film = input('请输入电影名...\n')
- task = FilmDownloader()
- task.FindFilmAndDownload(film)
复制代码 |
评分
-
查看全部评分
|