|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
- import socket # 导入 socket 模块
- import threading
- from flask import Flask
- from flask import render_template
- from flask import request
- import time
- def send_to_open():
- global c
- a1 = "01 0F 00 00 00 04 01 00 3E 96"
- a2 = "01 05 00 02 FF 00 2D FA"
- a3 = "01 05 00 03 FF 00 7C 3A"
- a4 = "01 0F 00 00 00 04 01 00 3E 96"
- print(bytes.fromhex(a1))
- c.send(bytes.fromhex(a1))
- time.sleep(0.5)
- print(bytes.fromhex(a2))
- c.send(bytes.fromhex(a2))
- time.sleep(0.5)
- print(bytes.fromhex(a3))
- c.send(bytes.fromhex(a3))
- time.sleep(3)
- print(bytes.fromhex(a4))
- c.send(bytes.fromhex(a4))
- def kill_socket():
- global c
- c.close()
- def connect():
- s = socket.socket() # 创建 socket 对象
- host = "0.0.0.0" # 获取本地主机名
- port = 6666 # 设置端口
- s.bind((host, port)) # 绑定端口
- s.listen(1) # 等待客户端连接
- a, addr = s.accept() # 建立客户端连接
- return a
- # 按间距中的绿色按钮以运行脚本。
- def open_door_test():
- # 全局变量
- # qos:互斥锁
- # c: socket连接对象
- global c
- global qos
- if qos == 1:
- qos = 0
- try:
- send_to_open()
- except:
- print('关闭了正在占线的链接!')
- kill_socket()
- print('关闭实例')
- c = connect()
- print('新建实例')
- send_to_open()
- print('重新开门')
- qos = 1
- def test_connect():
- global c
- global qos
- a1 = "01 0F 00 00 00 04 01 00 3E 96"
- while True:
- if qos == 1:
- try:
- print(bytes.fromhex(a1))
- c.send(bytes.fromhex(a1))
- time.sleep(0.5)
- c.send(bytes.fromhex(a1))
- time.sleep(0.5)
- c.send(bytes.fromhex(a1))
- time.sleep(0.5)
- c.send(bytes.fromhex(a1))
- time.sleep(0.5)
- time.sleep(600)
- except:
- print('关闭了正在占线的链接!')
- kill_socket()
- print('关闭实例')
- c = connect()
- print('新建实例')
- app = Flask(__name__)
- c = connect()
- qos = 1
- User_admin = ['qqbot', '1007800006', '2200475850', '1821269010', '1005944615', '1038888008', 'hardware']
- @app.route('/', methods=['GET', 'POST'])
- def index():
- if request.args.get('user') in User_admin:
- open_door_test()
- return render_template('index.html')
- else:
- return render_template('bad.html')
- app.run(port=14725, host='0.0.0.0')
- if __name__ == "__main__":
- conn = threading.Thread(target=test_connect())
- conn.start()
- app.run(port=14725, host='0.0.0.0')
复制代码 |
|