|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
C:\\WeChat Files\\wxid_48n7yufhk78y22\\FileStorage\\Video\\2022-09\\演示.mp4
- from tkinter import *
- from cv2 import imdecode,SIFT_create,FlannBasedMatcher
- from webbrowser import open as webopen
- from numpy import uint8,fromfile
- from threading import Thread
- from multiprocessing.dummy import Pool
- from multiprocessing import cpu_count
- from os import getpid, listdir, mkdir, remove, startfile, getcwd,rename,system
- from os.path import isdir, isfile, join, exists,getsize
- from tkinter.filedialog import askdirectory
- from windnd import hook_dropfiles
- from PIL import Image, ImageTk
- from tkinter.messagebox import showinfo, askyesno
- from shutil import copy
- # from skimage import io
- from pickle import dump, load
- from time import time
- # print(getpid())
- class APP:
- def __init__(self):
- self.root = Tk()
- self.root.geometry('450x430')
- self.root.protocol("WM_DELETE_WINDOW",func = lambda:self.dump_des(1,1))
- self.root.title('星猫搜图~')
- self.imgnum = 0 #用于判断文件夹内图片个数
- self.if_change = 0 #判断图片数量是否变化
- self.file = ''
- self.des_dict = {} #存储特征值的字典
- self.img11 = 0
- self.if_dump = 0 #判断是否保存
- self.progressVar = StringVar()
- self.NUM_ = 0 #用于给打不开的图片重新命名
- self.compare_msg = StringVar() #用来显示搜图进度
- # self.num111 = 0
- self.find_img=0 #已经搜索过的图片
- self.all_pid = [] #所有子进程pid
- self.me_pid = getpid() #识别本进程pid
- # print(self.me_pid)
- self.badimg = 0
- self.if_pool = True #是否打开多进程
- self.t = 0
- '''初始话匹配图像要用的东西'''
- self.sift = SIFT_create() # 初始化 SIFT 搜索器
-
- self.MIN_MATCH_COUNT = 10 #最小匹配特征个数
- self.nlist ={} #图片相似点个数
- FLANN_INDEX_KDTREE = 1
- index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
- search_params = dict(checks = 50)
- self.flann = FlannBasedMatcher(index_params, search_params) #flann算法搜索器
- if not exists('星猫图片'):
- mkdir('星猫图片')
- self.badimglist = []
- self.msg1 = StringVar(value=r'C:\Users\陈东豪\Desktop')
- self.if_top123 = 0
- self.filelist = []
- self.if_find = 0 #表示是否找到匹配图片
- # self.compare_img = 0 #用来计算进度
- self.initimg = []
- self.pk_file = 'des_dict.pk' #存储特征值的文件名
- if not isfile(self.pk_file):
- f = open(self.pk_file, 'wb')
- dump(self.des_dict, f)
- f.close()
- b = open(self.pk_file, 'rb')
- self.des_dict = load(b)
- b.close()
-
- def wait(self):
- """等候时打开页面,显示加载过程"""
- self.wait1 = Label((self.root), text='请稍等...')
- def clean_file(self,*args):
- '''清理掉不存在的图片地址'''
- for each in self.des_dict:
- if not isfile:
- del self.des_dict[each]
- self.dump_des()
-
- def main(self):
- Label((self.root), text='img文件路径:').grid(row=0, column=0)
- # self.hostVar = StringVar(value=r'C:\Users\Administrator\Desktop')
- self.hostVar = StringVar()
- self.numVar = StringVar(self.root, str(len(self.filelist)) + '张')
-
- self.filename = Entry((self.root), textvariable=(self.hostVar))
- self.filename.grid(row=0, column=1)
-
- self.bt = Button((self.root), text='确定', command=self.change_lujing)
- self.bt.grid(row=0, column=2)
- self.lb4 = Label((self.root), textvariable=(self.numVar))
- self.lb4.grid(row=0, column=3)
- self.bt2 = Button((self.root), text='打开目标img文件夹', command=(self.open_lujing))
- self.bt2.grid(row=1, column=0)
- self.bt4 = Button((self.root), text='此次查找详情', command=lambda:self.show_init(self.initimg))
- self.bt4.grid(row=1, column=1)
- self.bt5 = Button((self.root), text='打开星猫图片', command=(self.open_files))
- self.bt5.grid(row=1, column=3)
- self.bt6 = Button((self.root), text='查看损坏img', command=(self.see_badimg))
- self.bt6.grid(row=1, column=2)
- self.balloon = Label(self.root)
- self.prog = Label(self.root,textvariable=self.progressVar,font=('微软雅黑', 13, 'bold'))
- self.prog.place(x=30,y=260)
- self.bt.bind('<Button-3>', self.chose_dir)
- self.bt4.bind('<Double-Button-3>',self.change_pool)
- self.bt6.bind('<Double-Button-3>',self.clean_des)
- self.bt4.bind('<Enter>', lambda event: self.Balloon_show(event,\
- msg='双击鼠标右键启用多进程!\n慎重使用,很容易卡退!', fg='blue'))
- self.bt6.bind('<Enter>', lambda event: self.Balloon_show(event, msg='双击鼠标右键清理缓存!', fg='blue'))
- self.bt.bind('<Enter>', lambda event: self.Balloon_show(event, msg='单击鼠标右键打开文件选择框!', fg='blue'))
- self.bt5.bind('<Enter>', lambda event: self.Balloon_show(event, msg='三击鼠标右键打开百度搜图\nhttps://graph.baidu.com/pcpage/index?tpl_from=pc', fg='blue'))
- self.root.bind('<Leave>', self.Balloon_destroy)
- self.root.bind('<Triple-Button-3>', self.baidu)
- self.bt.bind('<Button-2>', lambda x:self.dump_des(1))
- hook_dropfiles(self.root, func=self.dragged_files) #当拖入文件时触发
- self.display_num()
- self.change_lujing()
- self.clean_file() #清理垃圾
- self.root.mainloop()
- def clean_des(self,*args):
- '''清零des文件'''
- self.des_dict = {}
- file_size = getsize(self.pk_file)
- showinfo('提示','之前的'+'%.2f' % (file_size/1024/1024)+'MB图片数据已清除!')
-
- def change_pool(self,*args):
- '''是否开启多进程'''
- self.if_pool = not self.if_pool
- if self.if_pool:
- showinfo('提示','开启多进程!\n搜图间隔不要少于三秒,不然程序缓不过来啦!')
- else:
- showinfo('提示','关闭多进程!')
-
-
- def show_init(self,filename):
- '''展示目标img信息'''
- try:
- file_msg = '\n'.join((item for item in filename))
- showinfo(title='提示', message=('总共有' + str(len(filename)) + '张相似图片!\n匹配特征点个数为:' +\
- str(self.max_) + '\n' + file_msg + '\n损坏图片:' + str(self.badimg) + \
- '张\n共耗时:' + str(self.t) + 's'))
- except:
- showinfo('提示','暂无相关信息!')
-
- def Balloon_destroy(self, event):
- """隐藏气泡ʾ"""
- try:
- self.balloon.place_forget()
- except:
- pass
- def Balloon_show(self, event, msg, fg=None, bg=None, font=('微软雅黑', 10, 'bold')):
- try:
- self.balloon.place_forget()
- except:
- pass
- else:
- self.balloon['fg'] = fg
- self.balloon['bg'] = bg
- self.balloon['text'] = msg
- self.balloon['font'] = font
- self.balloon.place(x=35, y=190)
- def baidu(self, event):
- """百度搜图"""
- webopen('https://graph.baidu.com/pcpage/index?tpl_from=pc')
- def chose_dir(self, event):
- '''选择文件夹'''
- file = askdirectory()
- if file != '':
- self.hostVar.set(file)
- self.change_lujing()
- def open_lujing(self, *args):
- """打开目标img文件夹"""
- if len(self.initimg) == 0:
- showinfo(title='提示', message='无目标img!')
- else:
- files = []
- for each in self.initimg:
- files.append(each.rsplit('\\', 1)[0])
- else:
- files = list(set(files))
- for each in files:
- startfile(each)
- def open_files(self, *args):
- """打开文件夹"""
- fname = getcwd() + '//星猫图片'
- startfile(fname)
- def see_badimg(self, *args):
- """损坏badimg"""
- if len(self.badimglist) == 0:
- showinfo(title='提示', message='没有badimg!')
- else:
- msg = '\n'.join(self.badimglist)
- showinfo('badimg路径', msg)
- if askyesno('警告', '是否删除损坏imgs?'):
- for each in self.badimglist:
- try:
- remove(each)
- self.badimglist.remove(each)
- except Exception as e:
- showinfo(title='提示', message=e)
- def display_num(self):
- """提示图片数量, 判断图片个数是否不再改变,显示搜索图片des值进度"""
- if len(self.filelist)!=0 :
- num_k =self.find_img/len(self.filelist)*100
- if num_k<=100:
- msg = '当前文件夹下图片des值计算进度为:\n'+\
- str(num_k)+'%'
- self.progressVar.set(value=msg) #设置进度信息
- else:
- msg = '当前文件夹下图片des值计算进度为:\n'+'0.00%'
- self.progressVar.set(value=msg) #设置进度信息
- num = len(self.filelist)
- self.numVar.set(str(num) + '张')
- if self.imgnum!=num:
- self.imgnum = num
- self.if_change = 1
- else:
- if self.if_change:
- #如果变化停止则开始计算des值
- self.if_change = 0
- self.find_img=0 #已经搜索过的图片
- if len(self.filelist)>=1000:
- if not askyesno('提示','搜索文件夹超过1000张图片,是否更换小一点的文件夹?'):
- self.thread_it(self.pool_com)
- else:
- self.change_lujing()
- else:
- self.thread_it(self.pool_com)
- # showinfo('提示','开始计算图片des值啦!')
- self.root.after(600, self.display_num)
- def pool_com(self,*args):
- '''开启多进程计算des'''
- if self.if_pool:
- try:
- print(cpu_count())
- pool = Pool(cpu_count()-1) #使用核数不用拉满
- # print(pool._quick_get)
- # print(pool.getname())
- k = pool.map_async(self.compute, self.filelist) #callback可指定回调参数
- pool.close()#关闭进程池,不再接受新的进程
- pool.join()#主进程阻塞等待子进程的退出
- self.kill_pid()
- except Exception as e:
- print(e)
- print('多进程出错!本次启用单进程!')
- for each in self.filelist:
- self.compute(each)
- else:
- for each in self.filelist:
- self.compute(each)
- showinfo('提示','初始化已完成!\n可以开始使用搜图功能啦!')
- # def close_pool(self,args):
- # print('已完成')
- def kill_pid(self,*args):
- '''杀死其余进程'''
- for pid in self.all_pid:
- if pid != self.me_pid:
- cmd = 'taskkill /pid ' + str(pid) + ' /f'
- print(cmd)
- try:
- system(cmd)
- except:
- pass
-
- self.all_pid = [] #清空进程列表
-
- def change_lujing(self):
- """更改搜图路径"""
- self.if_dump = 1
- self.find_img=0
- self.file = self.filename.get()
- if isdir(self.file):
- pass
- else:
- lujing = getcwd()
- self.hostVar.set(lujing)
- self.file = self.filename.get()
- self.filelist = []
- self.thread_it(self.getFileList, self.file, self.filelist)
- # def pool_map(self, func, *args):
- # """多进程"""
- # pool = Pool()
- # pool.map_async(func, args)
- # pool.close()
- # pool.join()
- def getFileList(self, dir, imglist, ext=['jpg', 'png', 'jpeg', 'bmp', 'webp']):
- """
- 获取文件夹及其子文件夹中图片列表
- 输入 dir:文件夹根目录
- 输入 ext: 扩展名
- 返回: 文件路径列表
- """
- newDir = dir
- if isfile(dir):
- if dir.rsplit('.', 1)[(-1)] in ext:
- imglist.append(dir)
- elif isdir(dir):
- try:
- for s in listdir(dir):
- newDir = join(dir, s)
- self.getFileList(newDir, imglist, ext)
- except Exception as e:
- print(s,'\n',e)
- return imglist
- #打开指定的图片,缩放到指定尺寸
- def get_img(self,filename,width=445,height=365):
- try:
- im = Image.open(filename).resize((width,height))
- # 引用:添加一个Label,用来存储图片。使用PanedWindow也行。
- panel = Label(master=self.root)
- panel.photo = ImageTk.PhotoImage(im) # 将原本的变量photo改为panel.photo
- return panel.photo
- except:
- return 0
-
- def compute(self,filename,if_twice=0):
- '''图像特征点初始化'''
- self.find_img+=1
- pid=getpid()
- if pid in self.all_pid:
- pass
- else:
- self.all_pid.append(pid)
- try:
- if filename in self.des_dict:
- #如果字典中已存在filename特征值就不用计算
- pass
- else:
- img1 = self.cv_imread(filename,1) #加载图片
- kp1, des1 = self.sift.detectAndCompute(img1,None) #搜索特征点
- self.des_dict[filename] = des1 #记录特征信息
- except Exception as e:
- if not if_twice:
- #图片名字有问题就改呗
- new_name = filename.rsplit('\\',1)[0]+'\\'+'img'+str(self.NUM_)+'.'+filename.rsplit('.',1)[-1]
- rename(filename,new_name)
- self.NUM_+=1
- # print(filename+'出错拉!'+'\n新名字:'+new_name)
- self.compute(new_name,1) #第二次搜索
- else:
- #只改一次
- print(filename+'有问题')
- pass
- # print(e)
-
- def dragged_files(self, files):
- """拖拽图片"""
- self.nlist = {} #重置
- self.sum_msg2 = [item.decode('gbk') for item in files] #要搜索的图片列表
- self.NUM = 0 #搜索图片次序
- self.search_img(self.sum_msg2,self.NUM)
-
- def search_img(self,filename = [],num=0):
- '''搜索图片,可放入一个图片地址列表,num表示搜索列表第几个'''
- try:
- img_name0 = filename[num]
- ext1 = img_name0.rsplit('.', 1)[(-1)]
- try:
- self.img.destroy()
- except:
- pass
-
- if ext1 in ('jpg', 'png', 'jpeg', 'bmp', 'webp'):
- image1 = self.get_img(img_name0)
- if image1 != 0:
- self.img = Label((self.root), image=image1)
- self.img.grid(row=2, columnspan=4)
- self.imgdict = {} #用来保存图片相似度
- self.thread_it(self.Compare2, filename,num)
- else:
- showinfo('提示',img_name0+'\n该图片有问题!')
-
- else:
- showinfo(title='提示', message=('暂不支持当前格式搜索!(' + str(ext1) + ')'))
- except IndexError:
- '''如果超出列表范围则停止'''
- pass
-
- def Compare2(self, imglist=[],num=0):
- '''开始搜索相似图片ing'''
- img = imglist[num]
- img1 = self.cv_imread(img)
-
- try:
- kp2, self.des2 = self.sift.detectAndCompute(img1,None) #首先计算要匹配的图片的特征值
- #使用多进程加速
- try:
- if self.if_pool:
- try:
- # print(1)
- pool = Pool(cpu_count()-1)
- # pool.map_async(self.compare, self.filelist,callback =lambda x :self.show_result(imglist,num))
- pool.map_async(self.compare, self.filelist,callback=lambda x:self.dispose_result())
- pool.close()#关闭进程池,不再接受新的进程
- pool.join() #清理僵尸进程
- self.kill_pid() #杀死进程
- except Exception as e:
- print(e)
- print('多进程出错!请重试')
- # for each in self.filelist:
- # self.compare(each)
- else:
- for each in self.filelist:
- self.compare(each)
- self.dispose_result()
- except:
- showinfo('提示','请等待当前进程处理完毕!')
- except:
- showinfo('提示', '此图片损坏,请更换其他图片!')
- def dispose_result(self,*args):
- '''处理最后的结果'''
- self.show_result(self.sum_msg2,self.NUM) #显示查找结果
- # print(self.nlist)
- self.NUM+=1 #次序加1
- if self.NUM <= (len(self.sum_msg2)-1):
- self.search_img(self.sum_msg2,self.NUM) #开始搜索下张图片
-
-
-
- def show_result(self,imglist=[],num =0 ):
- '''用来展示搜索结果'''
- try:
- t1 = time()
- self.max_ = max(self.nlist.values())
- print(self.max_)
- # print(self.min_)
- max_key = [i for i in self.nlist.keys() if self.nlist[i] == self.max_]
- # print(max_key)
- self.initimg = max_key
- for each_img in max_key:
- try:
- copy(each_img, './/星猫图片')
- except Exception as e:
- print(e)
- if not isfile(each_img):
- del self.des_dict[each_img]
- max_key.remove(each_img)
- if max_key!=[]:
- self.show_img1(max_key)
- self.dump_des() #数据保存
- t2 = time()
- self.t = t2 - t1
- except Exception as e:
- showinfo('提示',e)
-
- def dump_des(self,if_display=0,if_destroy=0):
- '''存des'''
- # print(1)
- f= open(self.pk_file, 'wb')
- dump(self.des_dict, f) #存储图片的特征值
- f.close()
- if if_display:
- showinfo('提示','图片des数据已保存!')
- if if_destroy:
- self.root.destroy()
-
-
-
- def show_msg(self, title1='提示', msg='呱呱呱!', ms=0, fontbig=20):
- """显示消息"""
- if msg != self.msg1:
- self.msg1.set(msg)
- if not self.if_top123:
- self.top123 = Toplevel()
- self.top123.title(title1)
- self.top123.protocol('WM_DELETE_WINDOW', self.destroytop)
- msg1 = Label((self.top123), textvariable=(self.msg1), font=('微软雅黑', fontbig, 'bold'))
- msg1.pack()
- if ms != 0:
- self.top123.after(ms, self.destroytop)
- def destroytop(self):
- """关闭窗口"""
- self.top123.destroy()
- self.if_top123 = 0
- def show_img1(self, filename=[]):
- """传入图像地址列表,可显示多张照片"""
- print(filename)
- for each_name in filename:
- top11 = Toplevel()
- top11.title(each_name)
- top11.geometry('640x400')
- img2 = Label((top11), image=(self.get_img(each_name, 640, 400)))
- img2.pack()
- def thread_it(self, func, *args):
- """多线程"""
- t = Thread(target=func, args=args)
- t.setDaemon(True)
- t.start()
- def cv_imread(self, filePath,num=1):
- '''使得imread可以读取中文路径,读取灰度值照片
- num=0表示读取灰度照片,num=-1表示读取完整照片,num=1表示读取彩图'''
- return imdecode(fromfile(filePath, dtype=(uint8)),num)
- def compare(self,filename,*args):
- '''图片名字,是否第二次搜索'''
- pid = getpid()
- if pid in self.all_pid:
- pass
- else:
- self.all_pid.append(pid)
- try:
- #计算匹配点
- matches = self.flann.knnMatch(self.des_dict[filename],self.des2,k=2)
- # store all the good matches as per Lowe's ratio test.
- num = 0 #计算点个数
- for m,n in matches:
- if m.distance < 0.7*n.distance:
- #符合要求的点才算数
- num+=1
- if num>=self.MIN_MATCH_COUNT:
- self.nlist[filename] = num #记录特征点个数
- except Exception as e:
- print(e)
-
- if __name__ == '__main__':
- a = APP()
- a.main()
复制代码 |
|