在电脑中搜索图片
C:\WeChat Files\wxid_48n7yufhk78y22\FileStorage\Video\2022-09\演示.mp4from tkinter import *
from cv2 import imdecode,SIFT_create,FlannBasedMatcher
from webbrowser import open as webopen
from numpy import uint8,fromfile
from threading import Thread
from multiprocessing.dummy import Pool
from multiprocessing import cpu_count
from os import getpid, listdir, mkdir, remove, startfile, getcwd,rename,system
from os.path import isdir, isfile, join, exists,getsize
from tkinter.filedialog import askdirectory
from windnd import hook_dropfiles
from PIL import Image, ImageTk
from tkinter.messagebox import showinfo, askyesno
from shutil import copy
# from skimage import io
from pickle import dump, load
from time import time
# print(getpid())
class APP:
def __init__(self):
self.root = Tk()
self.root.geometry('450x430')
self.root.protocol("WM_DELETE_WINDOW",func = lambda:self.dump_des(1,1))
self.root.title('星猫搜图~')
self.imgnum = 0 #用于判断文件夹内图片个数
self.if_change = 0 #判断图片数量是否变化
self.file = ''
self.des_dict = {} #存储特征值的字典
self.img11 = 0
self.if_dump = 0 #判断是否保存
self.progressVar = StringVar()
self.NUM_ = 0 #用于给打不开的图片重新命名
self.compare_msg= StringVar() #用来显示搜图进度
# self.num111 = 0
self.find_img=0 #已经搜索过的图片
self.all_pid = [] #所有子进程pid
self.me_pid = getpid() #识别本进程pid
# print(self.me_pid)
self.badimg = 0
self.if_pool = True #是否打开多进程
self.t = 0
'''初始话匹配图像要用的东西'''
self.sift = SIFT_create()# 初始化 SIFT 搜索器
self.MIN_MATCH_COUNT = 10 #最小匹配特征个数
self.nlist ={} #图片相似点个数
FLANN_INDEX_KDTREE = 1
index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
search_params = dict(checks = 50)
self.flann = FlannBasedMatcher(index_params, search_params)#flann算法搜索器
if not exists('星猫图片'):
mkdir('星猫图片')
self.badimglist = []
self.msg1 = StringVar(value=r'C:\Users\陈东豪\Desktop')
self.if_top123 = 0
self.filelist = []
self.if_find = 0#表示是否找到匹配图片
# self.compare_img = 0 #用来计算进度
self.initimg = []
self.pk_file = 'des_dict.pk' #存储特征值的文件名
if not isfile(self.pk_file):
f = open(self.pk_file, 'wb')
dump(self.des_dict, f)
f.close()
b = open(self.pk_file, 'rb')
self.des_dict = load(b)
b.close()
def wait(self):
"""等候时打开页面,显示加载过程"""
self.wait1 = Label((self.root), text='请稍等...')
def clean_file(self,*args):
'''清理掉不存在的图片地址'''
for each in self.des_dict:
if not isfile:
del self.des_dict
self.dump_des()
def main(self):
Label((self.root), text='img文件路径:').grid(row=0, column=0)
# self.hostVar = StringVar(value=r'C:\Users\Administrator\Desktop')
self.hostVar = StringVar()
self.numVar = StringVar(self.root, str(len(self.filelist)) + '张')
self.filename = Entry((self.root), textvariable=(self.hostVar))
self.filename.grid(row=0, column=1)
self.bt = Button((self.root), text='确定', command=self.change_lujing)
self.bt.grid(row=0, column=2)
self.lb4 = Label((self.root), textvariable=(self.numVar))
self.lb4.grid(row=0, column=3)
self.bt2 = Button((self.root), text='打开目标img文件夹', command=(self.open_lujing))
self.bt2.grid(row=1, column=0)
self.bt4 = Button((self.root), text='此次查找详情', command=lambda:self.show_init(self.initimg))
self.bt4.grid(row=1, column=1)
self.bt5 = Button((self.root), text='打开星猫图片', command=(self.open_files))
self.bt5.grid(row=1, column=3)
self.bt6 = Button((self.root), text='查看损坏img', command=(self.see_badimg))
self.bt6.grid(row=1, column=2)
self.balloon = Label(self.root)
self.prog = Label(self.root,textvariable=self.progressVar,font=('微软雅黑', 13, 'bold'))
self.prog.place(x=30,y=260)
self.bt.bind('<Button-3>', self.chose_dir)
self.bt4.bind('<Double-Button-3>',self.change_pool)
self.bt6.bind('<Double-Button-3>',self.clean_des)
self.bt4.bind('<Enter>', lambda event: self.Balloon_show(event,\
msg='双击鼠标右键启用多进程!\n慎重使用,很容易卡退!', fg='blue'))
self.bt6.bind('<Enter>', lambda event: self.Balloon_show(event, msg='双击鼠标右键清理缓存!', fg='blue'))
self.bt.bind('<Enter>', lambda event: self.Balloon_show(event, msg='单击鼠标右键打开文件选择框!', fg='blue'))
self.bt5.bind('<Enter>', lambda event: self.Balloon_show(event, msg='三击鼠标右键打开百度搜图\nhttps://graph.baidu.com/pcpage/index?tpl_from=pc', fg='blue'))
self.root.bind('<Leave>', self.Balloon_destroy)
self.root.bind('<Triple-Button-3>', self.baidu)
self.bt.bind('<Button-2>', lambda x:self.dump_des(1))
hook_dropfiles(self.root, func=self.dragged_files) #当拖入文件时触发
self.display_num()
self.change_lujing()
self.clean_file() #清理垃圾
self.root.mainloop()
def clean_des(self,*args):
'''清零des文件'''
self.des_dict = {}
file_size = getsize(self.pk_file)
showinfo('提示','之前的'+'%.2f' % (file_size/1024/1024)+'MB图片数据已清除!')
def change_pool(self,*args):
'''是否开启多进程'''
self.if_pool = not self.if_pool
if self.if_pool:
showinfo('提示','开启多进程!\n搜图间隔不要少于三秒,不然程序缓不过来啦!')
else:
showinfo('提示','关闭多进程!')
def show_init(self,filename):
'''展示目标img信息'''
try:
file_msg = '\n'.join((item for item in filename))
showinfo(title='提示', message=('总共有' + str(len(filename)) + '张相似图片!\n匹配特征点个数为:' +\
str(self.max_) + '\n' + file_msg + '\n损坏图片:' + str(self.badimg) + \
'张\n共耗时:' + str(self.t) + 's'))
except:
showinfo('提示','暂无相关信息!')
def Balloon_destroy(self, event):
"""隐藏气泡ʾ"""
try:
self.balloon.place_forget()
except:
pass
def Balloon_show(self, event, msg, fg=None, bg=None, font=('微软雅黑', 10, 'bold')):
try:
self.balloon.place_forget()
except:
pass
else:
self.balloon['fg'] = fg
self.balloon['bg'] = bg
self.balloon['text'] = msg
self.balloon['font'] = font
self.balloon.place(x=35, y=190)
def baidu(self, event):
"""百度搜图"""
webopen('https://graph.baidu.com/pcpage/index?tpl_from=pc')
def chose_dir(self, event):
'''选择文件夹'''
file = askdirectory()
if file != '':
self.hostVar.set(file)
self.change_lujing()
def open_lujing(self, *args):
"""打开目标img文件夹"""
if len(self.initimg) == 0:
showinfo(title='提示', message='无目标img!')
else:
files = []
for each in self.initimg:
files.append(each.rsplit('\\', 1))
else:
files = list(set(files))
for each in files:
startfile(each)
def open_files(self, *args):
"""打开文件夹"""
fname = getcwd() + '//星猫图片'
startfile(fname)
def see_badimg(self, *args):
"""损坏badimg"""
if len(self.badimglist) == 0:
showinfo(title='提示', message='没有badimg!')
else:
msg = '\n'.join(self.badimglist)
showinfo('badimg路径', msg)
if askyesno('警告', '是否删除损坏imgs?'):
for each in self.badimglist:
try:
remove(each)
self.badimglist.remove(each)
except Exception as e:
showinfo(title='提示', message=e)
def display_num(self):
"""提示图片数量, 判断图片个数是否不再改变,显示搜索图片des值进度"""
if len(self.filelist)!=0 :
num_k =self.find_img/len(self.filelist)*100
if num_k<=100:
msg = '当前文件夹下图片des值计算进度为:\n'+\
str(num_k)+'%'
self.progressVar.set(value=msg) #设置进度信息
else:
msg = '当前文件夹下图片des值计算进度为:\n'+'0.00%'
self.progressVar.set(value=msg) #设置进度信息
num = len(self.filelist)
self.numVar.set(str(num) + '张')
if self.imgnum!=num:
self.imgnum = num
self.if_change = 1
else:
if self.if_change:
#如果变化停止则开始计算des值
self.if_change = 0
self.find_img=0 #已经搜索过的图片
if len(self.filelist)>=1000:
if not askyesno('提示','搜索文件夹超过1000张图片,是否更换小一点的文件夹?'):
self.thread_it(self.pool_com)
else:
self.change_lujing()
else:
self.thread_it(self.pool_com)
# showinfo('提示','开始计算图片des值啦!')
self.root.after(600, self.display_num)
def pool_com(self,*args):
'''开启多进程计算des'''
if self.if_pool:
try:
print(cpu_count())
pool = Pool(cpu_count()-1)#使用核数不用拉满
# print(pool._quick_get)
# print(pool.getname())
k = pool.map_async(self.compute, self.filelist) #callback可指定回调参数
pool.close()#关闭进程池,不再接受新的进程
pool.join()#主进程阻塞等待子进程的退出
self.kill_pid()
except Exception as e:
print(e)
print('多进程出错!本次启用单进程!')
for each in self.filelist:
self.compute(each)
else:
for each in self.filelist:
self.compute(each)
showinfo('提示','初始化已完成!\n可以开始使用搜图功能啦!')
# def close_pool(self,args):
# print('已完成')
def kill_pid(self,*args):
'''杀死其余进程'''
for pid in self.all_pid:
if pid != self.me_pid:
cmd = 'taskkill /pid ' + str(pid) + ' /f'
print(cmd)
try:
system(cmd)
except:
pass
self.all_pid = [] #清空进程列表
def change_lujing(self):
"""更改搜图路径"""
self.if_dump = 1
self.find_img=0
self.file = self.filename.get()
if isdir(self.file):
pass
else:
lujing = getcwd()
self.hostVar.set(lujing)
self.file = self.filename.get()
self.filelist = []
self.thread_it(self.getFileList, self.file, self.filelist)
# def pool_map(self, func, *args):
# """多进程"""
# pool = Pool()
# pool.map_async(func, args)
# pool.close()
# pool.join()
def getFileList(self, dir, imglist, ext=['jpg', 'png', 'jpeg', 'bmp', 'webp']):
"""
获取文件夹及其子文件夹中图片列表
输入 dir:文件夹根目录
输入 ext: 扩展名
返回: 文件路径列表
"""
newDir = dir
if isfile(dir):
if dir.rsplit('.', 1)[(-1)] in ext:
imglist.append(dir)
elif isdir(dir):
try:
for s in listdir(dir):
newDir = join(dir, s)
self.getFileList(newDir, imglist, ext)
except Exception as e:
print(s,'\n',e)
return imglist
#打开指定的图片,缩放到指定尺寸
def get_img(self,filename,width=445,height=365):
try:
im = Image.open(filename).resize((width,height))
# 引用:添加一个Label,用来存储图片。使用PanedWindow也行。
panel = Label(master=self.root)
panel.photo = ImageTk.PhotoImage(im)# 将原本的变量photo改为panel.photo
return panel.photo
except:
return 0
def compute(self,filename,if_twice=0):
'''图像特征点初始化'''
self.find_img+=1
pid=getpid()
if pid in self.all_pid:
pass
else:
self.all_pid.append(pid)
try:
if filename in self.des_dict:
#如果字典中已存在filename特征值就不用计算
pass
else:
img1 = self.cv_imread(filename,1) #加载图片
kp1, des1 = self.sift.detectAndCompute(img1,None) #搜索特征点
self.des_dict = des1 #记录特征信息
except Exception as e:
if not if_twice:
#图片名字有问题就改呗
new_name = filename.rsplit('\\',1)+'\\'+'img'+str(self.NUM_)+'.'+filename.rsplit('.',1)[-1]
rename(filename,new_name)
self.NUM_+=1
# print(filename+'出错拉!'+'\n新名字:'+new_name)
self.compute(new_name,1)#第二次搜索
else:
#只改一次
print(filename+'有问题')
pass
# print(e)
def dragged_files(self, files):
"""拖拽图片"""
self.nlist = {} #重置
self.sum_msg2 = #要搜索的图片列表
self.NUM = 0 #搜索图片次序
self.search_img(self.sum_msg2,self.NUM)
def search_img(self,filename = [],num=0):
'''搜索图片,可放入一个图片地址列表,num表示搜索列表第几个'''
try:
img_name0 = filename
ext1 = img_name0.rsplit('.', 1)[(-1)]
try:
self.img.destroy()
except:
pass
if ext1 in ('jpg', 'png', 'jpeg', 'bmp', 'webp'):
image1 = self.get_img(img_name0)
if image1 != 0:
self.img = Label((self.root), image=image1)
self.img.grid(row=2, columnspan=4)
self.imgdict = {} #用来保存图片相似度
self.thread_it(self.Compare2, filename,num)
else:
showinfo('提示',img_name0+'\n该图片有问题!')
else:
showinfo(title='提示', message=('暂不支持当前格式搜索!(' + str(ext1) + ')'))
except IndexError:
'''如果超出列表范围则停止'''
pass
def Compare2(self, imglist=[],num=0):
'''开始搜索相似图片ing'''
img = imglist
img1 = self.cv_imread(img)
try:
kp2, self.des2 = self.sift.detectAndCompute(img1,None) #首先计算要匹配的图片的特征值
#使用多进程加速
try:
if self.if_pool:
try:
# print(1)
pool = Pool(cpu_count()-1)
# pool.map_async(self.compare, self.filelist,callback =lambda x :self.show_result(imglist,num))
pool.map_async(self.compare, self.filelist,callback=lambda x:self.dispose_result())
pool.close()#关闭进程池,不再接受新的进程
pool.join()#清理僵尸进程
self.kill_pid() #杀死进程
except Exception as e:
print(e)
print('多进程出错!请重试')
# for each in self.filelist:
# self.compare(each)
else:
for each in self.filelist:
self.compare(each)
self.dispose_result()
except:
showinfo('提示','请等待当前进程处理完毕!')
except:
showinfo('提示', '此图片损坏,请更换其他图片!')
def dispose_result(self,*args):
'''处理最后的结果'''
self.show_result(self.sum_msg2,self.NUM) #显示查找结果
# print(self.nlist)
self.NUM+=1 #次序加1
if self.NUM <= (len(self.sum_msg2)-1):
self.search_img(self.sum_msg2,self.NUM)#开始搜索下张图片
def show_result(self,imglist=[],num =0 ):
'''用来展示搜索结果'''
try:
t1 = time()
self.max_ = max(self.nlist.values())
print(self.max_)
# print(self.min_)
max_key = == self.max_]
# print(max_key)
self.initimg = max_key
for each_img in max_key:
try:
copy(each_img, './/星猫图片')
except Exception as e:
print(e)
if not isfile(each_img):
del self.des_dict
max_key.remove(each_img)
if max_key!=[]:
self.show_img1(max_key)
self.dump_des() #数据保存
t2 = time()
self.t = t2 - t1
except Exception as e:
showinfo('提示',e)
def dump_des(self,if_display=0,if_destroy=0):
'''存des'''
# print(1)
f=open(self.pk_file, 'wb')
dump(self.des_dict, f) #存储图片的特征值
f.close()
if if_display:
showinfo('提示','图片des数据已保存!')
if if_destroy:
self.root.destroy()
def show_msg(self, title1='提示', msg='呱呱呱!', ms=0, fontbig=20):
"""显示消息"""
if msg != self.msg1:
self.msg1.set(msg)
if not self.if_top123:
self.top123 = Toplevel()
self.top123.title(title1)
self.top123.protocol('WM_DELETE_WINDOW', self.destroytop)
msg1 = Label((self.top123), textvariable=(self.msg1), font=('微软雅黑', fontbig, 'bold'))
msg1.pack()
if ms != 0:
self.top123.after(ms, self.destroytop)
def destroytop(self):
"""关闭窗口"""
self.top123.destroy()
self.if_top123 = 0
def show_img1(self, filename=[]):
"""传入图像地址列表,可显示多张照片"""
print(filename)
for each_name infilename:
top11 = Toplevel()
top11.title(each_name)
top11.geometry('640x400')
img2 = Label((top11), image=(self.get_img(each_name, 640, 400)))
img2.pack()
def thread_it(self, func, *args):
"""多线程"""
t = Thread(target=func, args=args)
t.setDaemon(True)
t.start()
def cv_imread(self, filePath,num=1):
'''使得imread可以读取中文路径,读取灰度值照片
num=0表示读取灰度照片,num=-1表示读取完整照片,num=1表示读取彩图'''
return imdecode(fromfile(filePath, dtype=(uint8)),num)
def compare(self,filename,*args):
'''图片名字,是否第二次搜索'''
pid = getpid()
if pid in self.all_pid:
pass
else:
self.all_pid.append(pid)
try:
#计算匹配点
matches = self.flann.knnMatch(self.des_dict,self.des2,k=2)
# store all the good matches as per Lowe's ratio test.
num = 0 #计算点个数
for m,n in matches:
if m.distance < 0.7*n.distance:
#符合要求的点才算数
num+=1
if num>=self.MIN_MATCH_COUNT:
self.nlist = num #记录特征点个数
except Exception as e:
print(e)
if __name__ == '__main__':
a = APP()
a.main()
页:
[1]