|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
import queue
import socket
import time
import re
import serial.tools.list_ports
import threading
import queue
from loguru import logger
# 全局变量,用于存储串口对象
from Hplc import ParseEngine3762
from Parse_698 import ParseMsgEngine
from Parse645 import ParseEngine_645
import Sockapi
# from pymongo import MongoClient
import MeterMap
import SerialConfigMap
ser = None # 这个空值
running = False
from loguru import logger
import datetime
import os
class Rotator:
@logger.catch
def __init__(self, *, size, at):
now = datetime.datetime.now()
self._size_limit = size
self._time_limit = now.replace(hour=at.hour, minute=at.minute, second=at.second)
if now >= self._time_limit:
# 当前时间已经过了目标时间,为了防止立即进行旋转,增加一天
self._time_limit += datetime.timedelta(days=1)
@logger.catch
def should_rotate(self, message, file):
file.seek(0, 2)
if file.tell() + len(message) > self._size_limit:
return True
if message.record["time"].timestamp() > self._time_limit.timestamp():
self._time_limit += datetime.timedelta(days=1)
return True
return False
# 获取当前文件夹路径
current_dir = os.path.dirname(os.path.realpath(__file__))
# 创建日志文件夹路径
log_folder_path = os.path.join(current_dir, "logs")
# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)
# 旋转文件大小超过 500MB 或每天午夜旋转一次
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 生成日志文件路径
log_file_path = os.path.join(log_folder_path, f"file_{current_time}.log")
# log_file_path = f"E:\\python\\PycharmProjects\\C_demo_two\\C_demo_two\\file_{current_time}.log"
rotator = Rotator(size=5e+8, at=datetime.time(23, 59, 59))
logger.add(log_file_path, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation=rotator.should_rotate)
class SerialAssistant:
@logger.catch
def __init__(self, port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE, timeout=10, rtscts=False, *args, **kwargs):
self.port = port
self.ser = serial.Serial(port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits,
timeout=timeout, rtscts=False)
def tcp_connect(self):
pass
def db_connect(self):
pass
# @logger.catch
def open(self):
self.ser.port = self.port
self.ser.is_open = True
if self.ser.isOpen():
self.ser.close()
if self.ser._port is not None and not self.ser.is_open:
self.ser.open()
self.ser.set_buffer_size(rx_size=4096, tx_size=4096)
put_ip = "10.108.2.101"
put_port = "6666"
put_ctime = "172800"
self.sockapi = Sockapi.GetMac()
self.sockapi.ConnectDevice(put_ip, put_port, put_ctime)
self.tcp_connect = Sockapi.Tcp_api()
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_connect.set_client_socket(client_socket)
server_address = ('10.17.166.199', 8888)
self.tcp_connect.client_socket.connect(server_address)
print('成功建立TCP连接')
@logger.catch
def close(self):
try:
self.ser.close()
except Exception as e:
raise e
# if self.ser.isOpen():
# self.ser.close()
@logger.catch
def __enter__(self):
self.ser.open()
return self
@logger.catch
def __exit__(self, *args, **kwargs):
self.ser.close()
all_queu = queue.Queue()
TEXT_QUEUE = queue.Queue(10000) # 最大的承载消息10000条
@logger.catch
def parse_zb_0_msg(sa):
try:
current_str = '' # 当前接收的数据
current_wait_time = 0 # 当前累计等待的时间
every_wait_time = 0.1 # 每次等待的时间
last_index = 0 # 上次的索引
find_text = 'fefefefe' # 索引的数据
find_text_length = len(find_text) # 索引数据的长度
find_start = 0 # 查询的起始位置
max_wait_time = 0.5 # 最大等待时间15s 可以改
while MeterMap.global_map.get('Zb') == '0':
# 每次休眠0.2秒
time.sleep(every_wait_time)
num = sa.ser.inWaiting()
# 如果没有数据,跳过本次循环
if num > 0:
# sleep会差生影响
if MeterMap.global_map.get('Zb') != '0':
break
# 读取数据
data = sa.ser.read(num)
else:
data = b''
# 将数据累加到current_str
current_str += data.hex()
# 如果数据没有达到指定长度继续读取
if len(current_str) < find_text_length:
# if current_str:
# logger.info(f"数据未达到最小的长度{find_text_length},当前数据为{current_str}")
# else:
# logger.info(f"数据未达到最小的长度{find_text_length},当前数据为空!")
continue
else:
# 查询索引值
current_index = current_str.find(find_text, find_start)
# 如果当前索引值不等于-1 代表找到了
if current_index != -1:
# 当前索引值减去上次索引值如果大于等于find_text_length
if current_index - last_index >= find_text_length:
# 切除找出的子串
sub_text = current_str[0:current_index]
# 如果没有以指定的开头说明存在脏数据,没有必要加到队列中
if sub_text.startswith(find_text):
TEXT_QUEUE.put(('0', sub_text))
else:
logger.info('错误数据,没有以{find_text}开头--->', sub_text)
# current_str 重新赋值
current_str = current_str[current_index:]
# 查询的起始位置 置零
find_start = 0
# 索引置零
last_index = 0
# 等待时间置零
current_wait_time = 0
else:
last_index = current_index
# 查询的起始位置 加上find_text的长度,为了索引下次
find_start += len(find_text)
else:
# 第一次等待最大等待15秒,就强制处理数据
if find_start > 0 and current_wait_time > max_wait_time:
sub_text = current_str[last_index:]
# logger.info(f"等待超过{max_wait_time}s 强制切割数据")
# 如果sub_text 与current_str 不相等说明存在一部分脏数据,切除
if sub_text != current_str:
logger.info(f"存在部分错误数据{current_index[:last_index]} 已被切除!")
# 数据的开头正确,加入到队列
if sub_text.startswith(find_text):
TEXT_QUEUE.put(('0', sub_text))
else:
logger.info(f'程序可能出错,强制切割数据是错误数据,没有以{find_text}开头--->{sub_text}')
# current_str 置空
current_str = ''
# 查询的起始位置 置零
find_start = 0
# 索引置零
last_index = 0
# 等待时间置零
current_wait_time = 0
elif find_start <= 0 and current_wait_time > max_wait_time:
print(f'当前数据为{current_str}超过{max_wait_time}s 就进行强制切割,将数据扔掉!')
current_str = ''
else:
pass
# print(f'当前等待时间为{current_wait_time}s')
# 每次增加0.2秒等待时间
current_wait_time += every_wait_time
except Exception as e:
logger.error("发生了异常:", e)
@logger.catch
def parse_zb_1_msg(sa):
while MeterMap.global_map.get('Zb') != '0':
time.sleep(0.1)
count = 3
current_data = ''
while count > 0:
num = sa.ser.inWaiting()
if num > 0:
data = sa.ser.read(num)
current_data += data.hex()
time.sleep(0.1)
count -= 1
data = current_data
if data:
if data[:2] == '68':
data_len = int((data[4:6] + data[2:4]), 16) * 2
if len(data) < data_len:
next_num = data_len - len(data)
if num >= next_num:
data_next = sa.ser.read(next_num)
data_next = data_next.hex()
data = data + data_next
if len(data) > 16:
TEXT_QUEUE.put(('1', data))
else:
logger.debug("")
@logger.catch
def get_frame(sa):
while True:
# 不能清空队列,要不消息会丢失
if MeterMap.global_map.get('Zb') == '0':
parse_zb_0_msg(sa)
else:
parse_zb_1_msg(sa)
@logger.catch
def judge_context(que, sa):
while True:
try:
zb, data = TEXT_QUEUE.get()
data = data.upper()
if zb == '0':
# logger.info(f"取出队列中的数据{data}")
data = data[8:]
# logger.info("开始处理数据")
logger.info(f"recive:{data}")
engine = ParseMsgEngine(hex_message=data, sa=sa)
# 负责解析
engine.start_parse(hex_message=data, is_send=True)
logger.info("数据处理结束")
else:
# logger.info(f"取出队列中的数据{data}")
# logger.info("开始处理数据")
logger.info(f"recive:{data}")
data_list = []
while len(data) > 0:
data_len = int((data[4:6] + data[2:4]), 16) * 2
data_index = data[:data_len]
data = data[data_len:]
data_list.append(data_index)
for item_data in data_list:
data = item_data
p = ParseEngine3762(sa=sa)
p.start_parse(data)
except Exception as e:
print(e)
time.sleep(1)
@logger.catch
def main():
with SerialAssistant(port="COM1", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE) as sa:
time.sleep(1)
serial_threading = threading.Thread(target=get_frame, args=(sa,))
# judge_threading = threading.Thread(target=judge_context)
serial_threading.start()
# judge_threading.start()
que = queue.Queue()
judge_context(que, sa)
if __name__ == '__main__':
main()
|
|