|

楼主 |
发表于 2024-8-14 14:09:34
|
显示全部楼层
目前找到一个方法可以实现,但是代码很不好看。有人可以优化一下吗?- from PyQt5.QtCore import pyqtSignal, pyqtSlot, QThread, QTimer, QEventLoop, QMetaObject, Qt, Q_ARG
- from PyQt5.QtWidgets import QApplication, QWidget
- from PyQt5 import uic
- import sys
- import datetime
- import threading
- from time import sleep
- import os
- class PrintThread(QThread):
- resSignal = pyqtSignal(str , str)
- pauseSignal = pyqtSignal()
- resumeSignal = pyqtSignal()
- def __init__(self):
- super().__init__()
- self.paused = threading.Event()
- self.paused.set()
- self.a = 0
- def run(self):
- def query_and_print():
- print(f"#query_and_print线程ID: {threading.get_ident()},时间{datetime.datetime.now()}")
- if not self.paused.is_set():
- return
- sleep(1)
- def pause():
- if self.p_timer and self.p_timer.isActive():
- print(f"暂停时的ID: {threading.get_ident()}")
- self.p_timer.stop() # 停止定时器
- print("定时器已暂停")
- def resume():
- if not self.p_timer.isActive():
- print(f"恢复时的ID: {threading.get_ident()}")
- self.p_timer.start() # 重新启动定时器
- self.paused.set()
- print("定时器已恢复")
- loop = QEventLoop()
- # 连接pauseSignal到pause函数
- self.pauseSignal.connect(pause)
- self.resumeSignal.connect(resume)
- print(f"子线程ID: {threading.get_ident()}")
- self.p_timer = QTimer()
- self.p_timer.setInterval(1000)
- self.p_timer.timeout.connect(query_and_print)
- #只启动一次
- if self.a == 0:
- self.p_timer.start()
- self.a = 1
- loop.exec_() # 启动事件循环
- @pyqtSlot()
- def emit_pause_signal(self):
- # 在子线程中发射暂停信号
- self.pauseSignal.emit()
- def request_pause(self):
- # 从主线程调用这个方法来请求暂停
- QMetaObject.invokeMethod(self, "emit_pause_signal", Qt.DirectConnection)
- @pyqtSlot()
- def emit_resume_signal(self):
- # 在子线程中发射暂停信号
- self.resumeSignal.emit()
- def request_resume(self):
- # 从主线程调用这个方法来请求运行
- QMetaObject.invokeMethod(self, "emit_resume_signal", Qt.DirectConnection)
- class Mywindow(QWidget):
- def __init__(self):
- super().__init__()
- self.init_ui()
- print(f"主线程ID: {threading.get_ident()}")
- self.p_thread = PrintThread()
- self.p_thread.resSignal.connect(self.update_to_mysql)
- self.p_thread.start()
- def fun1(self):
- self.p_thread.request_pause()
- print(f"发送暂停信号的线程ID: {threading.get_ident()}")
- for i in range(10):
- sleep(1)
- print(f'正在运行{i}')
- self.p_thread.request_resume()
- def init_ui(self):
- self.ui = uic.loadUi("./测试.ui")
- self.btn1 = self.ui.pushButton
- self.btn1.clicked.connect(self.fun1)
- def update_to_mysql(self):
- pass
- if __name__ == "__main__":
- app = QApplication(sys.argv)
- w = Mywindow()
- w.ui.show()
- app.exec()
复制代码 |
|