python3的一些 功能/工具 说明
本帖最后由 Cool_Breeze 于 2020-9-10 19:56 编辑getopt使用方法
#coding=utf-8
import sys
import getopt
# 模拟命令行
sys.argv =['t.py', '-iabc.txt', '-ores.txt', '--sn=123123123', '12']
print(sys.argv)
# 'i:o:' 短命令,':' 为短命令必须带参数 写成:-i123.txt-i就等于123.txt
# 'io' 为短命令不带参数 写成:-i,-o
# ['in=, out=, nu='] 长命令,'=' 为长命令必须带参数 写成:--in=xxx --in就等于xxx
# ['in, out, nu'] 为长命令不带参数 写成: --in
opts, args = getopt.getopt(sys.argv,'i:o:', ['input=', 'output=', 'sn='] )
print(opts) # 命令列表 [('-i', 'abc.txt'), ('-o', 'res.txt'), ('--sn', '123123123')]
print(args) # 命令不配的参数值 ['12']
input_s = ''
output = ''
sn = ''
for o,v in opts:
if o in '-i, --input':
input_s = v
elif o in '-o, --output':
output = v
elif o in '--sn':
sn = v
print('i = {}\no = {}\nsn = {}'.format(input_s, output, sn))
#i = abc.txt
#o = res.txt
#sn = 123123123 本帖最后由 Cool_Breeze 于 2020-9-10 21:38 编辑
sys.path # 查看所有模块的导入路径
pip 用法
pip install <package_name>
pip uninstall <package_name>
pip list 已安装包列表 name version
pip freezename==version
pip freeze > file_name
pip install -r file_name读取 file_name 文件里的包信息 然后逐一安装
# 临时修改
pip install <package_name> -i <url> #从指定源下载包
# 永久修改
首先在当前用户目录下新建一个 pip 目录, 在 pip 目录下面添加一个 pip.ini 文件
批处理命令查看 用户目录echo %USERPROFILE%
# pip.ini 文件内容
index-url=https://pypi.douban.com/simple
trusted-host=pypi.douban.com
# 一些常用的国内源
(1)阿里云 http://mirrors.aliyun.com/pypi/simple/
(2)豆瓣http://pypi.douban.com/simple/
(3)清华大学 https://pypi.tuna.tsinghua.edu.cn/simple/
(4)中国科学技术大学 http://pypi.mirrors.ustc.edu.cn/simple/
(5)华中科技大学http://pypi.hustunique.com/
Cool_Breeze 发表于 2020-9-10 19:41
pip 用法
pip install
腾讯源也可以,速度稳定 本帖最后由 hrp 于 2020-9-10 20:29 编辑
getopt是内置模块吗
看上去好像比click方便 hrp 发表于 2020-9-10 20:23
腾讯源也可以,速度稳定
可以。我试试! getopt是内置模块,click 我没有用过 本帖最后由 Cool_Breeze 于 2020-9-16 21:06 编辑
class 的一些内置方法!
#!/usr/bin/env python3
#coding=utf-8
from time import sleep
class a(object):
def __init__(self):
print("__init__ was called!")
def __del__(self):
print("__del__ was called!")
def __len__(self):
print("__len__ was called!")
return 6
def __str__(self):
return 'str'
def __repr__(self):
return 'repr'
def __eq__(self, name):
return 1
print('command : b = a()')
b = a()
print('command : b == 1')
print( b == 1)
print('command : len(b)')
print(len(b))
print('command : b)')
print(b)
print('sleep(2), exit!')
sleep(2)
#object.__new__(cls) 申请内存空间
#c = object.__new__(a)
#c.__init__()
command : b = a()
__init__ was called!
command : b == 1
1
command : len(b)
__len__ was called!
6
command : b)
str
sleep(2), exit!
官方文档:https://docs.python.org/zh-cn/3.7/library/stdtypes.html 本帖最后由 Cool_Breeze 于 2020-9-16 21:00 编辑
class 方法的使用
#!/usr/bin/env python3
#coding=utf-8
class games:
agent = '腾讯' # 类属性
game = 'LOL'
def __init__(self, player):
self.player = player # 实例属性
def play(self): #实例方法
print('{} 正在玩 {}'.format(self.player, games.game)) # 类属性可以通过类名.属性来调用
@staticmethod
def current_time(): #静态方法
import time
print('当前时间 {}'.format(time.ctime()))
return 0
@classmethod
def initformation(cls): # 类方法
print('游戏代理:{} 游戏名 {}'.format(cls.agent, cls.game))
return 0
# print(games.initformation())
# 游戏代理:腾讯 游戏名 LOL
# 0
# print(games.current_time())
# 当前时间 Wed Sep 16 19:20:50 2020
# 0
# t = games('平平')
# print(t.game) # 实例对象没有该属性,就往上找类的属性
# LOL
# print(t.play())
# 平平 正在玩 LOL
# None
# games.play(t)
# 平平 正在玩 LOL
# t.initformation()
# 游戏代理:腾讯 游戏名 LOL
# t.current_time()
# 当前时间 Wed Sep 16 19:20:50 2020
# t 变量名是 games 类的实例
# 实例方法中的参数 self 就是指的实例 t 【 系统自动传入 】
# 还有一种方法就是自己通过类名,手动传入 self 这个参数, 【 一个类实例 】
# games.play(t)
# 静态方法:当一个方法既没有用到实例属性,有没有用到类属性。相对独立的方法。
# 类方法: 当一个方法只用到了类属性,就可以定义为类方法。 cls 参数就是 类名
# 类属性只能通过类对象修改 本帖最后由 Cool_Breeze 于 2020-10-4 11:23 编辑
红绿灯
https://docs.python.org/zh-cn/3.7/library/multiprocessing.html#module-multiprocessing.pool
#!/usr/bin/env python3
from multiprocessing import Process, Manager, Pool
import time #开启延时,减少cpu占用
class traffic(object):
def __init__(self):
self.queue = Manager().Queue() #多个进程共享数据,需要服务器进程管理
def light(self):
self.queue.put('red')
while True:
for i in range(1,6):
print('current red light %d' %abs(i-6))
for j in range(1,10):
self.queue.put('red')
time.sleep(0.1)
for i in range(1,5):
print('current green light %d' %abs(i-5))
for j in range(1,10):
self.queue.put('green')
time.sleep(0.1)
def light_start(self):
p = Process(target=self.light) #单独一个进程开启灯
p.start()
def light_status(self):
return (None,self.queue.get()) #返回灯状态
def car(self):
po = Pool(10)
for i in range(1,30): #进程池车辆
#apply_async(func[, args[, kwds[, callback[, error_callback]]]])
po.apply_async(self.car_pass,(i,),error_callback=self.err)
po.close()
po.join() #等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate() 。
def car_pass(self, i):
while True:
if self.light_status() == 'green':
print('NO: {:2} car Pass!'.format(i))
break
else:
print('NO: {:2} car Stop!'.format(i))
time.sleep(0.05)
def err(self, erron):
print(erron)
if __name__ == '__main__':
test = traffic()
test.light_start()
test.car()
while True:
print('light status %s' %test.light_status())
time.sleep(0.1) 本帖最后由 Cool_Breeze 于 2020-10-6 16:36 编辑
"""
信号量对象也支持 上下文管理协议 。
class threading.Semaphore(value=1)
该类实现信号量对象。信号量对象管理一个原子性的计数器,代表 release() 方法的调用次数减去 acquire() 的调用次数再加上一个初始值。如果需要, acquire() 方法将会阻塞直到可以返回而不会使得计数器变成负数。在没有显式给出 value 的值时,默认为1。
可选参数 value 赋予内部计数器初始值,默认值为 1 。如果 value 被赋予小于0的值,将会引发 ValueError 异常。
在 3.3 版更改: 从工厂函数变为类。
acquire(blocking=True, timeout=None)
获取一个信号量。
在不带参数的情况下调用时:
如果在进入时内部计数器的值大于零,则将其减一并立即返回 True。
如果在进入时内部计数器的值为零,则将会阻塞直到被对 release() 的调用唤醒。 一旦被唤醒(并且计数器的值大于 0),则将计数器减 1 并返回 True。 每次对 release() 的调用将只唤醒一个线程。 线程被唤醒的次序是不可确定的。
当发起调用时将 blocking 设为假值,则不进行阻塞。 如果一个无参数调用将要阻塞,则立即返回 False;在其他情况下,执行与无参数调用时一样的操作,然后返回 True。
当发起调用时如果 timeout 不为 None,则它将阻塞最多 timeout 秒。 请求在此时段时未能成功完成获取则将返回 False。 在其他情况下返回 True。
在 3.2 版更改: 新的 timeout 形参。
release()
释放一个信号量,将内部计数器的值增加1。当计数器原先的值为0且有其它线程正在等待它再次大于0时,唤醒正在等待的线程。
class threading.BoundedSemaphore(value=1)
该类实现有界信号量。有界信号量通过检查以确保它当前的值不会超过初始值。如果超过了初始值,将会引发 ValueError 异常。在大多情况下,信号量用于保护数量有限的资源。如果信号量被释放的次数过多,则表明出现了错误。没有指定时, value 的值默认为1。
在 3.3 版更改: 从工厂函数变为类。
"""
#!/usr/bin/env python3
from threading import Thread, Semaphore
import time
'''
使用信号量使 2个线程按顺序被交替执行
'''
class foo(object):
def __init__(self):
self.sema = Semaphore() # 信号量a 控制a线程 默认值为1
self.semb = Semaphore() # 信号量b 控制b线程 默认值为1
self.semb.acquire() # 信号量a - 1 = 0(启动)
def a(self):
for i in range(10):
self.sema.acquire() # 第一次a=1-1=0,第二次调用将被阻塞,直到被sema.release调用
print("{} runing {}".format(foo.a.__name__, i))
self.semb.release() # 调用被阻塞semb后,马上又被sema.acquire阻塞
def b(self):
for i in range(10):
self.semb.acquire() # b线程阻塞 (第二次)b=0, 直到被semb.release调用
print("{} runing {}".format(foo.b.__name__, i))
self.sema.release() # 调用被阻塞sema后,马上又被semb.acquire阻塞
if __name__ == '__main__':
test = foo()
p = Thread(target=test.a)
p1 = Thread(target=test.b)
p.start()
time.sleep(1) # a线程率先启动,但还是得等线程b启动
p1.start()
p.join() # p 在 p1 执行完后结束。加了信号量的原因。 查看 模块文件所在位置import multiprocessing
import sys
sys.modules["multiprocessing"]
页:
[1]