|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
- #!/isr/bin/python3
- #coding=utf-8
- import sys
- import argparse
- import threading
- import asyncio
- import re
- import chardet
- import aiohttp
- import ssl
- import csv
- import IPy
- ssl._create_default_https_context = ssl._create_unverified_context
- class WebTitle:
- def __init__(self, urls, coroutine_count=20):
- self.urls = list(set(urls))
- self.coroutine_count = coroutine_count
- self.result = {}
- def init_queue(self):
- queue = asyncio.Queue()
- for url in self.urls:
- queue.put_nowait(url)
- return queue
- def get_title_from_html(self, html):
- title = 'not content!'
- title_patten = r'<title>(\s*?.*?\s*?)</title>'
- result = re.findall(title_patten, html)
- if len(result) >= 1:
- title = result[0]
- title = title.strip()
- return title
- async def get_title(self, queue):
- while True:
- url = await queue.get()
- print('get title for {}'.format(url))
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, timeout=3, ssl=ssl.SSLContext()) as resp:
- html = await resp.text()
- title = self.get_title_from_html(html)
- print('{}:{}'.format(url,title))
- self.result[url] = title
- except Exception as e:
- print('{} has error: {} '.format(url,str(e)))
- queue.task_done()
- async def start_task(self):
- queue = self.init_queue()
- tasks = []
- for i in range(self.coroutine_count):
- task = asyncio.create_task(self.get_title(queue))
- tasks.append(task)
- await queue.join()
- for task in tasks:
- task.cancel()
- await asyncio.gather(*tasks, return_exceptions=True)
- def start(self):
- asyncio.run(self.start_task())
- def write_result(self, outfile):
- urls = self.result.keys()
- with open(outfile, 'w') as f:
- writer = csv.writer(f)
- writer.writerow(['url','title'])
- for url in urls:
- title = self.result[url]
- writer.writerow([url, title])
- with open(outfile + '_alive.txt', 'w') as f:
- for url in urls:
- f.write(url + '\n')
-
- print('alive urls write to {}'.format(outfile + '_alive.txt'))
- print('title write to {}'.format(outfile))
- def parse_args():
- parser = argparse.ArgumentParser(description='A tool that can get title for domains or urls')
- parser.add_argument('-d','--domain', metavar='domain.txt', dest='domain_file', type=str, help=u'domain to get title')
- parser.add_argument('-u','--url', metavar='url.txt', dest='url_file', type=str, help=u'urls to get title')
- parser.add_argument('-i','--ip', metavar='ip.txt', dest='ip_file', type=str, help=u'ips to get title')
- parser.add_argument('-t','--coroutine', metavar='20', dest='coroutine_count', type=int, default=20,help=u'coroutines to get title')
- parser.add_argument('-o','--outfile', metavar='result.txt', dest='outfile', type=str, default='result.csv',help=u'file to result')
- args = parser.parse_args()
- if args.url_file == None and args.domain_file == None and args.ip_file == None:
- parser.print_help()
- sys.exit()
- return args
- def main():
- try:
- args = parse_args()
- urls = []
- if args.domain_file:
- with open(args.domain_file) as f:
- domains = f.readlines()
- for domain in domains:
- domain = domain.strip()
- if domain != '':
- urls.append('http://' + domain)
- urls.append('https://' + domain)
- if args.url_file:
- with open(args.url_file) as f:
- urls2 = f.readlines()
- for url in urls2:
- url = url.strip()
- if url != '':
- urls.append(url)
- if args.ip_file:
- with open(args.ip_file) as f:
- ips = f.readlines()
- for ip in ips:
- ip = ip.strip()
- if ip != '':
- # cidr_ip = IPy.IP(ip)
- cidr_ip = IPy.IP(ip, make_net=1)
- for i in cidr_ip:
- urls.append('http://' + str(i))
- urls.append('https://' + str(i))
- web_title = WebTitle(urls, args.coroutine_count)
- web_title.start()
- web_title.write_result(args.outfile)
- except Exception as e:
- print(e)
- def banner():
- print('''==================''')
- if __name__ == '__main__':
- banner()
- main()
复制代码 |
|