|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
import queue
import socket
import time
import re
import serial.tools.list_ports
import threading
import queue
from loguru import logger
# 全局变量,用于存储串口对象
from Hplc import ParseEngine3762
from Parse_698 import ParseMsgEngine
from Parse645 import ParseEngine_645
import Sockapi
# from pymongo import MongoClient
import MeterMap
import SerialConfigMap
ser = None # 这个空值
running = False
from loguru import logger
import datetime
import os
class Rotator:
@logger.catch
def __init__(self, *, size, at):
now = datetime.datetime.now()
self._size_limit = size
self._time_limit = now.replace(hour=at.hour, minute=at.minute, second=at.second)
if now >= self._time_limit:
# 当前时间已经过了目标时间,为了防止立即进行旋转,增加一天
self._time_limit += datetime.timedelta(days=1)
@logger.catch
def should_rotate(self, message, file):
file.seek(0, 2)
if file.tell() + len(message) > self._size_limit:
return True
if message.record["time"].timestamp() > self._time_limit.timestamp():
self._time_limit += datetime.timedelta(days=1)
return True
return False
# 获取当前文件夹路径
current_dir = os.path.dirname(os.path.realpath(__file__))
# 创建日志文件夹路径
log_folder_path = os.path.join(current_dir, "logs")
# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)
# 旋转文件大小超过 500MB 或每天午夜旋转一次
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 生成日志文件路径
log_file_path = os.path.join(log_folder_path, f"file_{current_time}.log")
# log_file_path = f"E:\\python\\PycharmProjects\\C_demo_two\\C_demo_two\\file_{current_time}.log"
rotator = Rotator(size=5e+8, at=datetime.time(23, 59, 59))
logger.add(log_file_path, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation=rotator.should_rotate)
class SerialAssistant:
@logger.catch
def __init__(self, port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE, timeout=10, rtscts=False, *args, **kwargs):
self.port = port
self.ser = serial.Serial(port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits,
timeout=timeout, rtscts=False)
def tcp_connect(self):
pass
def db_connect(self):
pass
# @logger.catch
def open(self):
self.ser.port = self.port
self.ser.is_open = True
if self.ser.isOpen():
self.ser.close()
if self.ser._port is not None and not self.ser.is_open:
self.ser.open()
self.ser.set_buffer_size(rx_size=4096, tx_size=4096)
put_ip = "10.108.2.101"
put_port = "6666"
put_ctime = "172800"
self.sockapi = Sockapi.GetMac()
self.sockapi.ConnectDevice(put_ip, put_port, put_ctime)
self.tcp_connect = Sockapi.Tcp_api()
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_connect.set_client_socket(client_socket)
server_address = ('10.17.166.199', 8888)
# server_address = ('192.168.127.126', 8888)
self.tcp_connect.client_socket.connect(server_address)
# client = MongoClient('mongodb://10.17.166.199:27017')
# s1=time.time()
print('成功建立TCP连接')
# # s2 = time.time()
# print(s2-s1)
@logger.catch
def close(self):
try:
self.ser.close()
except Exception as e:
raise e
# if self.ser.isOpen():
# self.ser.close()
@logger.catch
def __enter__(self):
self.ser.open()
return self
@logger.catch
def __exit__(self, *args, **kwargs):
self.ser.close()
all_queu = queue.Queue()
TEXT_QUEUE = queue.Queue(10000) # 最大的承载消息10000条
@logger.catch
def parse_zb_0_msg(sa):
try:
current_str = '' # 当前接收的数据
current_wait_time = 0 # 当前累计等待的时间
every_wait_time = 0.1 # 每次等待的时间
last_index = 0 # 上次的索引
find_text = 'fefefefe' # 索引的数据
find_text_length = len(find_text) # 索引数据的长度
find_start = 0 # 查询的起始位置
max_wait_time = 0.5 # 最大等待时间15s 可以改
while MeterMap.global_map.get('Zb') == '0':
# 每次休眠0.2秒
time.sleep(every_wait_time)
num = sa.ser.inWaiting()
# 如果没有数据,跳过本次循环
if num > 0:
# sleep会差生影响
if MeterMap.global_map.get('Zb') != '0':
break
# 读取数据
data = sa.ser.read(num)
else:
data = b''
# 将数据累加到current_str
current_str += data.hex()
# 如果数据没有达到指定长度继续读取
if len(current_str) < find_text_length:
# if current_str:
# logger.info(f"数据未达到最小的长度{find_text_length},当前数据为{current_str}")
# else:
# logger.info(f"数据未达到最小的长度{find_text_length},当前数据为空!")
continue
else:
# 查询索引值
current_index = current_str.find(find_text, find_start)
# 如果当前索引值不等于-1 代表找到了
if current_index != -1:
# 当前索引值减去上次索引值如果大于等于find_text_length
if current_index - last_index >= find_text_length:
# 切除找出的子串
sub_text = current_str[0:current_index]
# 如果没有以指定的开头说明存在脏数据,没有必要加到队列中
if sub_text.startswith(find_text):
TEXT_QUEUE.put(('0', sub_text))
else:
logger.info('错误数据,没有以{find_text}开头--->', sub_text)
# current_str 重新赋值
current_str = current_str[current_index:]
# 查询的起始位置 置零
find_start = 0
# 索引置零
last_index = 0
# 等待时间置零
current_wait_time = 0
else:
last_index = current_index
# 查询的起始位置 加上find_text的长度,为了索引下次
find_start += len(find_text)
else:
# 第一次等待最大等待15秒,就强制处理数据
if find_start > 0 and current_wait_time > max_wait_time:
sub_text = current_str[last_index:]
logger.info(f"等待超过{max_wait_time}s 强制切割数据")
# 如果sub_text 与current_str 不相等说明存在一部分脏数据,切除
if sub_text != current_str:
logger.info(f"存在部分错误数据{current_index[:last_index]} 已被切除!")
# 数据的开头正确,加入到队列
if sub_text.startswith(find_text):
TEXT_QUEUE.put(('0', sub_text))
else:
logger.info(f'程序可能出错,强制切割数据是错误数据,没有以{find_text}开头--->{sub_text}')
# current_str 置空
current_str = ''
# 查询的起始位置 置零
find_start = 0
# 索引置零
last_index = 0
# 等待时间置零
current_wait_time = 0
elif find_start <= 0 and current_wait_time > max_wait_time:
print(f'当前数据为{current_str}超过{max_wait_time}s 就进行强制切割,将数据扔掉!')
current_str = ''
else:
print(f'当前等待时间为{current_wait_time}s')
# 每次增加0.2秒等待时间
current_wait_time += every_wait_time
except Exception as e:
logger.error("发生了异常:", e)
@logger.catch
def parse_zb_1_msg(sa):
while MeterMap.global_map.get('Zb') != '0':
data = ''
# num = sa.ser.inWaiting()
# if num > 0:
# data = sa.ser.read(num)
# data = data.hex()
# if num > 0:
# data_next = sa.ser.read(num)
# data_next = data_next.hex()
# data = data + data_next
# else:
# time.sleep(0.05)
# data_next = sa.ser.read(num)
# data_next = data_next.hex()
# data = data + data_next
time.sleep(0.1)
count = 3
current_data = ''
while count > 0:
num = sa.ser.inWaiting()
if num > 0:
data = sa.ser.read(num)
current_data += data.hex()
time.sleep(0.1)
count -= 1
data = current_data
if data:
if data[:2] == '68':
data_len = int((data[4:6] + data[2:4]), 16) * 2
if len(data) < data_len:
next_num = data_len - len(data)
if num >= next_num:
data_next = sa.ser.read(next_num)
data_next = data_next.hex()
data = data + data_next
# elif len(data) > data_len:
# data = data[:data_len]
# data_sencond = MeterMap.global_map['data_sencond']
# time.sleep(0.05) # 每隔50毫秒执行一次循环
#
# if sa.ser.inWaiting() > 0: # 检查串口缓存区是否有数据
# data = sa.ser.read_all() # 读取全部缓存区数据
# data = data.hex()
#
# if data[:2] == '68': # 判断数据开头是否为 '68'
# data_len = int((data[4:6] + data[2:4]), 16) * 2
#
# if len(data) < data_len: # 数据长度小于预期长度
# data_next_len = data_len - len(data)
# while sa.ser.inWaiting() < data_next_len: # 持续检查串口缓存区大小
# time.sleep(0.01)
# if sa.ser.inWaiting() >= data_next_len: # 再次检查缓存区大小是否满足条件
# data_next = sa.ser.read(data_next_len) # 取出补充数据
# data += data_next
if len(data) > 16:
TEXT_QUEUE.put(('1', data))
else:
pass
# num = sa.ser.inWaiting()
# if num > 0:
# data = sa.ser.read(num)
# data = data.hex()
# # data_sencond = MeterMap.global_map.get('data_sencond') or []
# # data = data_sencond + data
@logger.catch
def get_frame(sa):
#时间更新脚本前置控制
# while not MeterMap.global_map.get('update_complete', False):
# time.sleep(0.01)
while True:
# 不能清空队列,要不消息会丢失
if MeterMap.global_map.get('Zb') == '0':
parse_zb_0_msg(sa)
else:
parse_zb_1_msg(sa)
@logger.catch
def judge_context(que, sa):
while True:
try:
zb, data = TEXT_QUEUE.get()
data = data.upper()
if zb == '0':
# logger.info(f"取出队列中的数据{data}")
data = data[8:]
# logger.info("开始处理数据")
logger.info(f"recive:{data}")
engine = ParseMsgEngine(hex_message=data, sa=sa)
# 负责解析
engine.start_parse(hex_message=data, is_send=True)
# 注释下面两行代码就可以
# is_true = engine.start_parse(hex_message=data)
# if is_true:
# data = engine.cal_msg()
# if data:
# sa.ser.write(data)
# logger.info(f"send:{data}")
logger.info("数据处理结束")
else:
# logger.info(f"取出队列中的数据{data}")
# logger.info("开始处理数据")
logger.info(f"recive:{data}")
data_list = []
while len(data) > 0:
data_len = int((data[4:6] + data[2:4]), 16) * 2
data_index = data[:data_len]
data = data[data_len:]
data_list.append(data_index)
for item_data in data_list:
data = item_data
# if data[8:10] == '04' and data[44:50] == 'F10100':
# message_list = []
# message = data
#
# # 698报文解析
# if data[50:52] in ['00', '03']:
# message_count = int(data.count('68') / 2)
# for i in range(message_count):
# end_index = message.find('16')
# next_end_index = message.find('16', end_index + 2)
# message_list.append(message[:next_end_index + 2])
# print(message[:next_end_index + 2])
# message = message[next_end_index + 2:]
# # 调用698
# for item_msg in message_list:
# p = ParseMsgEngine(hex_message=item_msg, sa=sa)
# p.start_parse(hex_message=item_msg, is_send=True)
#
# # 645报文解析
# elif data[50:52] == '02':
# message_count = int((data.count('68') - 1) / 2)
# start_index = data.find('68')
# next_start_index = data.find('68', start_index + 2)
# print(next_start_index)
# message = data[next_start_index: -4]
# print(message)
# for i in range(message_count):
# stop = message.find('16')
# result = message[:stop + 2]
# message = message[stop + 2:]
# print(result)
# message_list.append(result)
# for item_msg in message_list:
# p = ParseEngine_645(sa=sa)
# p.start_parse(item_msg)
# hex_message_h
# data = data[58:-4]
p = ParseEngine3762(sa=sa)
p.start_parse(data)
# engine = ParseMsgEngine(hex_message=data)
# 负责解析
# is_true = engine.start_parse(hex_message=data)
# 新的引擎处理消息
# 解析出使用的协议
# 注释下面两行代码就可以
# if is_true:
# data = engine.cal_msg()
# if data:
# sa.ser.write(data)
# logger.info(f"send:{data}")
except Exception as e:
print(e)
time.sleep(1)
@logger.catch
def main():
with SerialAssistant(port="COM1", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE) as sa:
time.sleep(1)
serial_threading = threading.Thread(target=get_frame, args=(sa,))
# judge_threading = threading.Thread(target=judge_context)
serial_threading.start()
# judge_threading.start()
que = queue.Queue()
judge_context(que, sa)
if __name__ == '__main__':
main()
|
|