目前找到一个方法可以实现,但是代码很不好看。有人可以优化一下吗?from PyQt5.QtCore import pyqtSignal, pyqtSlot, QThread, QTimer, QEventLoop, QMetaObject, Qt, Q_ARG
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5 import uic
import sys
import datetime
import threading
from time import sleep
import os
class PrintThread(QThread):
resSignal = pyqtSignal(str , str)
pauseSignal = pyqtSignal()
resumeSignal = pyqtSignal()
def __init__(self):
super().__init__()
self.paused = threading.Event()
self.paused.set()
self.a = 0
def run(self):
def query_and_print():
print(f"#query_and_print线程ID: {threading.get_ident()},时间{datetime.datetime.now()}")
if not self.paused.is_set():
return
sleep(1)
def pause():
if self.p_timer and self.p_timer.isActive():
print(f"暂停时的ID: {threading.get_ident()}")
self.p_timer.stop() # 停止定时器
print("定时器已暂停")
def resume():
if not self.p_timer.isActive():
print(f"恢复时的ID: {threading.get_ident()}")
self.p_timer.start() # 重新启动定时器
self.paused.set()
print("定时器已恢复")
loop = QEventLoop()
# 连接pauseSignal到pause函数
self.pauseSignal.connect(pause)
self.resumeSignal.connect(resume)
print(f"子线程ID: {threading.get_ident()}")
self.p_timer = QTimer()
self.p_timer.setInterval(1000)
self.p_timer.timeout.connect(query_and_print)
#只启动一次
if self.a == 0:
self.p_timer.start()
self.a = 1
loop.exec_() # 启动事件循环
@pyqtSlot()
def emit_pause_signal(self):
# 在子线程中发射暂停信号
self.pauseSignal.emit()
def request_pause(self):
# 从主线程调用这个方法来请求暂停
QMetaObject.invokeMethod(self, "emit_pause_signal", Qt.DirectConnection)
@pyqtSlot()
def emit_resume_signal(self):
# 在子线程中发射暂停信号
self.resumeSignal.emit()
def request_resume(self):
# 从主线程调用这个方法来请求运行
QMetaObject.invokeMethod(self, "emit_resume_signal", Qt.DirectConnection)
class Mywindow(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
print(f"主线程ID: {threading.get_ident()}")
self.p_thread = PrintThread()
self.p_thread.resSignal.connect(self.update_to_mysql)
self.p_thread.start()
def fun1(self):
self.p_thread.request_pause()
print(f"发送暂停信号的线程ID: {threading.get_ident()}")
for i in range(10):
sleep(1)
print(f'正在运行{i}')
self.p_thread.request_resume()
def init_ui(self):
self.ui = uic.loadUi("./测试.ui")
self.btn1 = self.ui.pushButton
self.btn1.clicked.connect(self.fun1)
def update_to_mysql(self):
pass
if __name__ == "__main__":
app = QApplication(sys.argv)
w = Mywindow()
w.ui.show()
app.exec()
|