哈哈哈哈,分享一下我的爬虫小分队.
本帖最后由 神奇的yxq 于 2017-10-20 12:21 编辑学完小甲鱼的爬虫教程,迫不及待的写了几个爬虫~
爬虫功能:爬取电影资源
py版本:3.6.2
放源码:
1、主爬虫代码:
#爬虫名称:movies爬
#爬虫功能:爬取电影资源,生成一个html文本
#爬虫版本:1.0
#作者:yangxq
#时间:2017.09.10
import sys
sys.path.append('E:\LearnCode\python\MyMod')
import re
import random
import gethtml
import time
import savehtml
#主函数
def main():
target = 'http://www.mp4ba.net/'
#需要爬取的页数
pages = 10
#逐页爬取电影
for i in range(1,pages + 1):
#拼接每页的地址
pageurl = target + 'forum-mp4ba-' + str(i) +'.html'
#获取第i页的电影列表
code,html = gethtml.gethtml(pageurl)
html = html.decode(code)
print('\n正在访问第 %d/10页 : %s\n' % (i,pageurl) + '-' * 50)
movies_name,movies_url = getmovies(html)
num = len(movies_name)
info = []
#获取每个电影的磁力链接
for j in range(num):
print('\n第%d页 %d/%d | 正在爬取 %s 的详情页...' % (i,j + 1,num,movies_name))
code,html = gethtml.gethtml(movies_url)
html = html.decode(code)
movies_summary,magnet= getmagnet(html)
print('\n%s 的资源已经爬取!正在赶往下一个网页...' % movies_name + '\n' * 2 + '-' * 30 )
temp = []
#电影信息打包处理
temp.append(movies_summary)
for each in magnet:
temp.append(magnet)
info.append(temp)
#按页保存到html文件
savehtml.savehtml(movies_name,movies_url,info)
#获取页面电影链接
def getmovies(html):
movies_name = []
movies_url = []
start = 10
#循环查找电影名称和链接
while True:
#页面上电影名称开始特征'xst' 结束特征'<'
start = html.find('xst',start)
#当查找不到xst时退出循环
if start == -1:
break
end = html.find('<',start + 5)
print('\n已找到 : %s' % html)
movies_name.append(html)
#电影详情网址链接在'xst'之前第一个'href'之后 结束特征'"'
begin = html.find('href=',start - 100)
stop = html.find('"',begin + 6)
movies_url.append(html)
#进入下一个查找段
start += 100
#返回主页上的电影的网址列表
num = len(movies_name)
print('\n本页查找完毕! 共发现电影 %s 部\n' %num + '-' * 50)
return (movies_name,movies_url)
#获取磁力链接
def getmagnet(html):
magnet = []
#查找简介 特征'简介' 结束特征'<'
start = html.find('◎简 介')
end = html.find('<div',start + 10)
movies_summary = html
#删除错误截取的html代码
div = movies_summary.find('div')
movies_summary = movies_summary
print('\n简介:\n%s' % movies_summary)
#循环查找磁链
end = 0
while True:
#磁链开始特征'magnet:?' 结束标志'<'
start = html.find('magnet:?',end)
#当查找不到'magnet:?'时退出循环
if start == -1:
if len(magnet) == 0:
magnet.append('http://www.mp4ba.net/')
break
end = html.find('>',start)
print('\n磁力:%s' % html)
magnet.append(html)
#进入下一个查找段
#返回磁力链接列表
return (movies_summary,magnet)
if __name__ == '__main__':
print('\n爬虫开始工作...')
main()
因为需要写入到html文件,所有单独写了一个功能为保存为html的单独代码
2、保存功能的代码:
import time
def savehtml(movies_name,movies_url,info):
htmlinit = '''<!doctype html><html lang="zh-cn" ><head><meta charset="GBK"><title>MoviesList</title></head><body><h3 align="center" id="1">欢迎使用yangxq的电影爬虫</h3><h6 align="center"><stong>Yangxq</stong></h6><hr><hr>'''
localtime = time.localtime()
htmlname = 'MoviesList' + '_' + \
str(localtime) + '_' + \
str(localtime) + '_' + \
str(localtime) +'.html'
with open(htmlname,'a+') as f:
#将文件指针指向第一个字符
f.seek(0,0)
html = f.read()
#第一次写入初始化
if '<!doctype' not in html:
f.seek(0,0)
f.write(htmlinit)
#不是第一次写入则删除尾部结束代码
if '</p></body></html>' in html:
tel = html.find('</p></body></html>')
f.seek(tel,0)
#按格式写入文件
num = len(movies_name)
for i in range(num):
if movies_name in html:
continue
#第一行显示电影名称
lines1 = '''<p><a target="_blank" href="'''+ movies_url + '''"><big><stong>''' + movies_name + '</stong></big></a><br>' + '-' * 30 + '<br>'
#第二行显示电影简介
lines2 = '<i><small>' + info + '</small></i><br>' + '-' * 30 + '<br>'
#第三行显示下载地址
magnetnum = len(info)
lines3 = ''
for j in range(magnetnum):
lines3 = lines3 + '''<a href="'''+ info + '''">下载地址''' + str(j + 1) + '''</a> '''
lines = lines1 + lines2 + lines3 + '<hr>'
f.write(lines)
#结束代码
lines4 = '</p></body></html>'
f.write(lines4)
print('\n保存完毕!')
if __name__ == '__main__':
movies_name=['电影一','电影二']
info = [['我是电影一的简介',['我是电影一的第一个磁力','我是电影一的第二个磁力']],['我是电影二的简介',['我是电影二的第一个磁力','我是电影二的第二个磁力']]]
savehtml(movies_name,info)
为了提高通用性,把访问代码单独出来了。
3、提供访问功能的代码
import random
import urllib.request
import time
import os
def gethtml(url,data = None):
#建立地址表和协议表,获取代理ip
ipadds = []
iptype = []
#加上协议
if 'http://' not in url:
url = 'http://' + url
#从文件获取ip
timename = time.localtime()
ipname = 'iplist_' + str(timename) + '_' +\
str(timename) + '_' +\
str(timename) + '.txt'
#不是当天最新的iplist则运行ipget更新
if ipname not in os.listdir():
try:
import ipget
ipget.main()
with open(ipname,'r') as ip:
iplist = ip.read().split('\n')
for each in iplist:
iptype.append(each)
ipadds.append(each)
except ModuleNotFoundError:
ipadds = ['125.93.148.3:9000','123.7.38.31:9999','220.249.185.178:9999']
iptype = ['HTTP','HTTP','HTTP']
else:
with open(ipname,'r') as ip:
iplist = ip.read().split('\n')
for each in iplist:
iptype.append(each)
ipadds.append(each)
#代理和伪装
r = len(ipadds)
i = int(random.uniform(0,r))
if __name__ == '__main__':
print('\n本次访问使用 %s : %s 代理...' % (iptype,ipadds))
proxy_support = urllib.request.ProxyHandler({iptype:ipadds})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
req = urllib.request.Request(url,data)
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')
#req.add_header('Host','www.mmjpg.com')
#req.add_header('Accept','text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8')
#req.add_header('Accept-Encoding','gzip, deflate, sdch')
#req.add_header('Accept-Language','zh-CN,zh;q=0.8')
#req.add_header('Cache-Control','max-age=0')
#req.add_header('Referer','http://www.mmjpg.com/' + str(i))
#访问
response = urllib.request.urlopen(req)
html = response.read()
#不要太过分,休息1~5秒
code = codetest(html)
time.sleep(int(random.uniform(1,6)))
return (code,html)
#网页编码测试
def codetest(html):
try:
html = html.decode('UTF-8')
return 'UTF-8'
except UnicodeDecodeError:
try:
html = html.decode('GBK')
return 'GBK'
except UnicodeDecodeError:
try:
html = html.decode('GB18030')
return 'GB18030'
except UnicodeDecodeError:
return 'unknow'
if __name__ == '__main__':
print('请输入测试网址: ',end = '')
url = input()
code,html = gethtml(url)
print('\n该网页编码是: %s' % code)
if code != 'unknow':
with open(url + '.html','w') as f:
html = html.decode(code)
f.write(html)
print('\n文件写入完毕!')
防被封,每天第一次运行上面的gethtml时调用这个获取最新的ip
4、ip爬虫
#爬虫名称:ipget
#爬虫功能:从ip代理网站爬取代理ip并时间保存
#爬虫作者:yangxq
#时间:2017.09.09
import urllib.request
import random
import time
import re
#主函数
def main():
url = 'http://www.xicidaili.com/'
code,html = gethtml(url)
html = html.decode(code)
iplist = ipfind(html)
ipsave(iplist)
#查找ip
def ipfind(html):
iplist = {}
ip = []
#正则匹配ip,端口和协议
ipadds = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',html)
ipport = re.findall(r'<td>\d{1,5}</td>',html)
ipport = re.findall(r'\d{1,5}',str(ipport))
iptype = re.findall(r'<td>HTTPS</td>|<td>HTTP</td>',html)
iptype = re.findall(r'HTTPS|HTTP',str(iptype))
#以协议数量为标准得到可用ip数量,去除ss协议的ip
ipnum = len(iptype)
#拼接ip和端口,并导入字典,ip为键,协议为值
for i in range(ipnum):
ipadd = ipadds + ':' + ipport
ip.append(ipadd)
iplist] = iptype
ipnum = len(iplist)
if __name__ == '__main__':
print('\n已去掉重复ip地址,最终获得ip地址 %d 个' % ipnum)
return iplist
def gethtml(url,data = None):
ipadds = ['125.93.148.3:9000','123.7.38.31:9999','220.249.185.178:9999']
iptype = ['HTTP','HTTP','HTTP']
#代理和伪装
r = len(ipadds)
i = int(random.uniform(0,r))
if __name__ == '__main__':
print('\n本次访问使用 %s : %s 代理...' % (iptype,ipadds))
proxy_support = urllib.request.ProxyHandler({iptype:ipadds})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
req = urllib.request.Request(url,data)
req.add_header('User-Agent','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')
#req.add_header('Host','www.mmjpg.com')
#req.add_header('Accept','text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8')
#req.add_header('Accept-Encoding','gzip, deflate, sdch')
#req.add_header('Accept-Language','zh-CN,zh;q=0.8')
#req.add_header('Cache-Control','max-age=0')
#req.add_header('Referer','http://www.mmjpg.com/' + str(i))
#访问
response = urllib.request.urlopen(req)
html = response.read()
#不要太过分,休息1~5秒
code = codetest(html)
time.sleep(int(random.uniform(1,6)))
return (code,html)
def codetest(html):
try:
html = html.decode('UTF-8')
return 'UTF-8'
except UnicodeDecodeError:
try:
html = html.decode('GBK')
return 'GBK'
except UnicodeDecodeError:
try:
html = html.decode('GB18030')
return 'GB18030'
except UnicodeDecodeError:
return 'unknow'
#储存
def ipsave(iplist):
timename = time.localtime()
ipname = 'iplist_' + str(timename) + '_' +\
str(timename) + '_' +\
str(timename) + '.txt'
#保存
with open(ipname,'w') as f:
for each in iplist:
f.writelines(iplist + ' : ' + each)
f.write('\n')
if __name__ == '__main__':
print('\n所有ip保存完毕!')
if __name__ == '__main__':
main()
新加一个模块,自己写的,以后写爬虫就获取网页这一步就可以省很多事.
internet 模块:
import urllib.request, urllib.parse, urllib.error
import http.cookiejar
import os, os.path
import threading
import random
import queue
import chardet
import time
import mylogging
def urlParse(url):
if "www" not in url:
url = "www." + url
if "http" not in url:
url = "http://" + url
return url
class Browse:
def __init__(self, name="浏览器"):
self.name = name
self.cookie = False
# self.proxy = False
# self.history = []# 历史记录
self.header = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "utf-8",
"Accept-Language": "zh-CN,zh;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36",
"Connection": "keep-alive"}
# log日记初始化
self._log_ = mylogging.Logging(logPath="Download", name="浏览器")
def get(self, url):
"""
get方法的函数
:param url: 获取网页的地址
:return:
"""
url = urlParse(url)
req = urllib.request.Request(url, headers=self.header)
reTest = 0
loop = True
while loop:
if reTest < 3:
try:
response = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "HTTPError %s %s %s" % (url, code, reason)))
return "HTTPError"
except urllib.error.URLError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "URLError %s %s %s" % (url, code, reason)))
return "URLError"
except Exception:
reTest += 1
time.sleep(1)
continue
else:
content = response.read()
encoding = chardet.detect(content)["encoding"]
html = content.decode(encoding)
self._log_.put((self.name, "Get Html Success! %s" % url))
return html
else:
self._log_.put((self.name, "OtherError %s %s %s" % url))
return "OtherError"
def post(self, url, data):
"""
post方法的函数
:param url: 网页地址
:param data: 表单字典
:return:
"""
if self.cookie:
cjar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cjar))
urllib.request.install_opener(opener)
# url = urlParse(url)
postdata = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=postdata, headers=self.header)
reTest = 0
loop = True
while loop:
if reTest < 3:
try:
response = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "HTTPError %s %s %s" % (url, code, reason)))
return "HTTPError"
except urllib.error.URLError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "URLError %s %s %s" % (url, code, reason)))
return "URLError"
except Exception:
reTest += 1
time.sleep(1)
continue
else:
content = response.read()
encoding = chardet.detect(content)["encoding"]
html = content.decode(encoding)
self._log_.put((self.name, "Post Html Success! %s" % url))
return html
else:
self._log_.put((self.name, "OtherError %s %s %s" % url))
return "OtherError"
class Download(threading.Thread):
def __init__(self, srcQueue=queue.Queue(), name="下载器"):
threading.Thread.__init__(self)
self.name = name# 进程名称
self.srcQueue = srcQueue# 下载队列
self.sleeptime = 0# 下载等待时间
self.Daemon = True# 设置守护线程
self.downloadPath = "Download"# 批量下载路径
self.srcName = True# 下载命名方式 自动还是手动?
self.srcType = "jpg"# 下载文件的格式
self.downloadShow = True# 是否显示下载进度条
# log立即初始化
self._log_ = mylogging.Logging(logPath="Download", name="下载器")
def __reporthook__(self, downloaded, perDataSize, allData):
"""
用于显示下载进度条
:param downloaded:已经下载的数据包
:param perDataSize:每个数据包的大小
:param allData:文件的总大小
:return:
"""
if self.downloadShow:
downloadedSize = downloaded * perDataSize
percent = downloadedSize / allData
if percent > 1:
percent = 1
donePart = int(percent * 50)
undonePart = 50 - donePart
line = ">" * donePart + "_" * undonePart
downloadedSize = downloadedSize / 1024 / 1024#下载的大小
alldataSize = allData / 1024 / 1024#总大小
print("下载进度[%s%3.2d%%] %.2fMB/%.2fMB" % (line, percent * 100, downloadedSize, alldataSize))
else:
pass
def __getName__(self, count=0):
"""
获取下载名称
:param count:
:return:
"""
dirList = os.listdir(self.downloadPath)
while True:
if self.srcName:# 下载命名选项
name = str(count) + "." + self.srcType
count += 1
if not os.path.exists(self.downloadPath + "\\" + name):
break
else:
name = input("请重命名资源下载名称:")
break
return name
def run(self):
"""
线程主程序
:return:
"""
if not os.path.exists(self.downloadPath):# 创建下载文件夹
os.mkdir(self.downloadPath)
count = 1
srcUrl = self.srcQueue.get()
name = self.__getName__(count)
reTest = 0
loop = True
while loop:
if reTest < 3:
try:
path = self.downloadPath + "\\" + name# 拼接存放路径及名称
print("正在下载:%s " % (name, srcUrl[:20], srcUrl[-20:]))
urllib.request.urlretrieve(srcUrl, path, self.__reporthook__)
print("下载完成:%s " % (name, srcUrl[:20], srcUrl[-20:]))
except urllib.error.HTTPError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "HTTPError %s %s %s" % (srcUrl, code, reason)))
except urllib.error.URLError as e:
code = reason = "none"
if hasattr(e, "code"):
code = e.code
if hasattr(e, "reason"):
reason = e.reason
self._log_.put((self.name, "URLError %s %s %s" % (srcUrl, code, reason)))
except Exception:
reTest += 1
time.sleep(1)
continue
else:
self._log_.put((self.name, "%s Download Success! %s" % (path, srcUrl)))
else:
self._log_.put((self.name, "otherError %s" % srcUrl))
reTest = 0
srcUrl = self.srcQueue.get()
name = self.__getName__(count)
time.sleep(random.choice(range(self.sleeptime)))
tips:mylogging是我自己的错误立即模块.就不放上来了,其实就是一个put函数获取信息 再写入到相应的文本文件中 楼主,你这个复制下来是不是就可以用了,用了第三方的模块 没有呀。 {:10_277:} 741712547 发表于 2017-9-30 16:21
楼主,你这个复制下来是不是就可以用了,用了第三方的模块 没有呀。
没有可以用
页:
[1]