|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 Cool_Breeze 于 2020-8-4 15:32 编辑
- # 永远删不完的文件
- # BY GIN
- # 2020-7-31
- #coding=utf-8
- import os
- from multiprocessing import Process, Queue
- import time
- import random
- PH = r'D:\GIN\py\test\temp'
- Q = Queue()
- def random_del():
- # 随机删除一个文件
- os.chdir(PH)
- while True:
- time.sleep(0.5)
- file = str(random.Random.randint(random,1,10)) + '.txt'
- if os.path.isfile(file):
- print('随机删除一个文件:{}'.format(file))
- os.remove(file)
-
- def supplement_t(Q):
- # 补充缺失文件
- os.chdir(PH)
- while True:
- time.sleep(0.5)
- # print('get {:>2}'.format(Q.qsize()))
- while not Q.empty():
- file = Q.get()
- print('正在补充缺失文件:{}'.format(file))
- with open(file, "w") as f:
- pass
- def chec_txt(Q):
- # 检查文件是否缺失。
- # 如果文件缺失,向补充文件函数发送
- # 缺失文件。
- os.chdir(PH)
- while True:
- for i in (range(1,11)):
- time.sleep(0.3)
- file = str(i) + '.txt'
- if not os.path.isfile(file):
- Q.put(file)
- # print('put {:>2}'.format(Q.qsize()))
-
- def generate_txt():
- # 创建所有文件。
- os.chdir(PH)
- for i in range(1,11):
- file = str(i) + '.txt'
- print('touch {:>6}'.format(file))
- with open(file, "w") as f:
- pass
-
- if __name__ == '__main__':
- gen_p = Process(target=generate_txt)
- che_p = Process(target=chec_txt, args=(Q,)) # 需要传入相同的对象
- get_p = Process(target=supplement_t, args=(Q,)) # 需要传入相同的对象
- rad_p = Process(target=random_del)
- gen_p.start()
- che_p.start()
- get_p.start()
- rad_p.start()
- # 主进程会等待所有进程结束
复制代码
result:
- touch 1.txt
- touch 2.txt
- touch 3.txt
- touch 4.txt
- touch 5.txt
- touch 6.txt
- touch 7.txt
- touch 8.txt
- touch 9.txt
- touch 10.txt
- 随机删除一个文件:5.txt
- 随机删除一个文件:6.txt
- 随机删除一个文件:1.txt
- 正在补充缺失文件:5.txt
- 正在补充缺失文件:6.txt
- 随机删除一个文件:3.txt
- 随机删除一个文件:9.txt
- 随机删除一个文件:4.txt
- 正在补充缺失文件:1.txt
- 随机删除一个文件:2.txt
- 正在补充缺失文件:3.txt
- 正在补充缺失文件:4.txt
- 随机删除一个文件:1.txt
复制代码
####
- '''
- @ 进度条打印
- '''
- #coding=utf-8
- from time import sleep
- from multiprocessing import Process,Queue
- q = Queue()
- ico = [
- ['0%',''],
- ['10%','X'],
- ['20%','XX'],
- ['30%','XXX'],
- ['40%','XXXX'],
- ['50%','XXXXX'],
- ['60%','XXXXXX'],
- ['70%','XXXXXXX'],
- ['80%','XXXXXXXX'],
- ['90%','XXXXXXXXX'],
- ['100%\n','XXXXXXXXXX'],
- ]
- def run(q):
- while True:
- if not q.empty():
- t = q.get()
- if t == None: break
- print('\r{:<10}{}'.format(t[1],t[0]),end='',flush=True)
-
-
- if __name__ == "__main__":
- n = 0
- arr = []
- p = Process(target=run,args=(q,))
- p.start()
- print('开始计算 1 累加到 10 :')
- for i in range(11):
- n += i
- q.put(ico[i])
- sleep(0.2)
- q.put(None)
- p.join()
- print('计算 1 累加到 10 完成:{}'.format(n))
-
复制代码 |
|