gufengxiaoxiao 发表于 2023-11-1 16:43:42

parse_zb_0_msg存在运行一段时间后就不再接收数据问题

import queue
import socket
import time
import re
import serial.tools.list_ports
import threading
import queue
from loguru import logger

# 全局变量,用于存储串口对象
from Hplc import ParseEngine3762
from Parse_698 import ParseMsgEngine
from Parse645 import ParseEngine_645
import Sockapi
# from pymongo import MongoClient

import MeterMap
import SerialConfigMap

ser = None# 这个空值

running = False
from loguru import logger
import datetime
import os



class Rotator:
    @logger.catch
    def __init__(self, *, size, at):
      now = datetime.datetime.now()
      self._size_limit = size
      self._time_limit = now.replace(hour=at.hour, minute=at.minute, second=at.second)

      if now >= self._time_limit:
            # 当前时间已经过了目标时间,为了防止立即进行旋转,增加一天
            self._time_limit += datetime.timedelta(days=1)

    @logger.catch
    def should_rotate(self, message, file):
      file.seek(0, 2)
      if file.tell() + len(message) > self._size_limit:
            return True
      if message.record["time"].timestamp() > self._time_limit.timestamp():
            self._time_limit += datetime.timedelta(days=1)
            return True
      return False

# 获取当前文件夹路径
current_dir = os.path.dirname(os.path.realpath(__file__))

# 创建日志文件夹路径
log_folder_path = os.path.join(current_dir, "logs")

# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder_path):
    os.makedirs(log_folder_path)

# 旋转文件大小超过 500MB 或每天午夜旋转一次
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 生成日志文件路径
log_file_path = os.path.join(log_folder_path, f"file_{current_time}.log")



# log_file_path = f"E:\\python\\PycharmProjects\\C_demo_two\\C_demo_two\\file_{current_time}.log"

rotator = Rotator(size=5e+8, at=datetime.time(23, 59, 59))
logger.add(log_file_path, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation=rotator.should_rotate)



class SerialAssistant:
    @logger.catch
    def __init__(self, port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
               stopbits=serial.STOPBITS_ONE, timeout=10, rtscts=False, *args, **kwargs):
      self.port = port
      self.ser = serial.Serial(port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits,
                                 timeout=timeout, rtscts=False)

    def tcp_connect(self):
      pass

    def db_connect(self):
      pass


    # @logger.catch
    def open(self):
      self.ser.port = self.port

      self.ser.is_open = True
      if self.ser.isOpen():
            self.ser.close()
      if self.ser._port is not None and not self.ser.is_open:
            self.ser.open()
            self.ser.set_buffer_size(rx_size=4096, tx_size=4096)
      put_ip = "10.108.2.101"
      put_port = "6666"
      put_ctime = "172800"
      self.sockapi = Sockapi.GetMac()
      self.sockapi.ConnectDevice(put_ip, put_port, put_ctime)

      self.tcp_connect = Sockapi.Tcp_api()
      client_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.tcp_connect.set_client_socket(client_socket)
      server_address = ('10.17.166.199', 8888)
      # server_address = ('192.168.127.126', 8888)
      self.tcp_connect.client_socket.connect(server_address)
      # client = MongoClient('mongodb://10.17.166.199:27017')


      # s1=time.time()
      print('成功建立TCP连接')
      # # s2 = time.time()
      # print(s2-s1)


    @logger.catch
    def close(self):
      try:
            self.ser.close()
      except Exception as e:
            raise e
      # if self.ser.isOpen():
      #   self.ser.close()

    @logger.catch
    def __enter__(self):
      self.ser.open()
      return self

    @logger.catch
    def __exit__(self, *args, **kwargs):
      self.ser.close()

all_queu = queue.Queue()
TEXT_QUEUE = queue.Queue(10000)# 最大的承载消息10000条

@logger.catch
def parse_zb_0_msg(sa):
    try:
      current_str = ''# 当前接收的数据
      current_wait_time = 0# 当前累计等待的时间
      every_wait_time = 0.1# 每次等待的时间
      last_index = 0# 上次的索引
      find_text = 'fefefefe'# 索引的数据
      find_text_length = len(find_text)# 索引数据的长度
      find_start = 0# 查询的起始位置
      max_wait_time = 0.5# 最大等待时间15s 可以改

      while MeterMap.global_map.get('Zb') == '0':
            # 每次休眠0.2秒
            time.sleep(every_wait_time)
            num = sa.ser.inWaiting()
            # 如果没有数据,跳过本次循环
            if num > 0:
                # sleep会差生影响
                if MeterMap.global_map.get('Zb') != '0':
                  break
                # 读取数据
                data = sa.ser.read(num)
            else:
                data = b''
            # 将数据累加到current_str
            current_str += data.hex()
            # 如果数据没有达到指定长度继续读取
            if len(current_str) < find_text_length:
                # if current_str:
                #   logger.info(f"数据未达到最小的长度{find_text_length},当前数据为{current_str}")
                # else:
                #   logger.info(f"数据未达到最小的长度{find_text_length},当前数据为空!")
                continue
            else:
                # 查询索引值
                current_index = current_str.find(find_text, find_start)
                # 如果当前索引值不等于-1 代表找到了
                if current_index != -1:
                  # 当前索引值减去上次索引值如果大于等于find_text_length
                  if current_index - last_index >= find_text_length:
                        # 切除找出的子串
                        sub_text = current_str
                        # 如果没有以指定的开头说明存在脏数据,没有必要加到队列中
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info('错误数据,没有以{find_text}开头--->', sub_text)
                        # current_str 重新赋值
                        current_str = current_str
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                  else:
                        last_index = current_index
                        # 查询的起始位置 加上find_text的长度,为了索引下次
                        find_start += len(find_text)
                else:
                  # 第一次等待最大等待15秒,就强制处理数据
                  if find_start > 0 and current_wait_time > max_wait_time:
                        sub_text = current_str
                        logger.info(f"等待超过{max_wait_time}s 强制切割数据")
                        # 如果sub_text 与current_str 不相等说明存在一部分脏数据,切除
                        if sub_text != current_str:
                            logger.info(f"存在部分错误数据{current_index[:last_index]} 已被切除!")
                        # 数据的开头正确,加入到队列
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info(f'程序可能出错,强制切割数据是错误数据,没有以{find_text}开头--->{sub_text}')
                        # current_str 置空
                        current_str = ''
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                  elif find_start <= 0 and current_wait_time > max_wait_time:
                        print(f'当前数据为{current_str}超过{max_wait_time}s 就进行强制切割,将数据扔掉!')
                        current_str = ''
                  else:

                        print(f'当前等待时间为{current_wait_time}s')
                # 每次增加0.2秒等待时间
                current_wait_time += every_wait_time

    except Exception as e:
      logger.error("发生了异常:", e)

@logger.catch
def parse_zb_1_msg(sa):
    while MeterMap.global_map.get('Zb') != '0':
      data = ''
      # num = sa.ser.inWaiting()
      # if num > 0:
      #   data = sa.ser.read(num)
      #   data = data.hex()
      # if num > 0:
      #   data_next = sa.ser.read(num)
      #   data_next = data_next.hex()
      #   data = data + data_next
      # else:
      #   time.sleep(0.05)
      #   data_next = sa.ser.read(num)
      #   data_next = data_next.hex()
      #   data = data + data_next
      time.sleep(0.1)
      count = 3
      current_data = ''
      while count > 0:
            num = sa.ser.inWaiting()
            if num > 0:
                data = sa.ser.read(num)
                current_data += data.hex()
            time.sleep(0.1)
            count -= 1
      data = current_data
      if data:
            if data[:2] == '68':
                data_len = int((data + data), 16) * 2
                if len(data) < data_len:
                  next_num = data_len - len(data)
                  if num >= next_num:
                        data_next = sa.ser.read(next_num)
                        data_next = data_next.hex()
                        data = data + data_next
                # elif len(data) > data_len:
                #   data = data[:data_len]
                #   data_sencond = MeterMap.global_map['data_sencond']
      # time.sleep(0.05)# 每隔50毫秒执行一次循环
      #
      # if sa.ser.inWaiting() > 0:# 检查串口缓存区是否有数据
      #   data = sa.ser.read_all()# 读取全部缓存区数据
      #   data = data.hex()
      #
      #   if data[:2] == '68':# 判断数据开头是否为 '68'
      #         data_len = int((data + data), 16) * 2
      #
      #         if len(data) < data_len:# 数据长度小于预期长度
      #             data_next_len = data_len - len(data)
      #             while sa.ser.inWaiting() < data_next_len:# 持续检查串口缓存区大小
      #               time.sleep(0.01)
      #             if sa.ser.inWaiting() >= data_next_len:# 再次检查缓存区大小是否满足条件
      #               data_next = sa.ser.read(data_next_len)# 取出补充数据
      #               data += data_next
      if len(data) > 16:
            TEXT_QUEUE.put(('1', data))
      else:
            pass

      # num = sa.ser.inWaiting()
      # if num > 0:
      #   data = sa.ser.read(num)
      #   data = data.hex()
      #   # data_sencond = MeterMap.global_map.get('data_sencond') or []
      #   # data = data_sencond + data

@logger.catch
def get_frame(sa):
    #时间更新脚本前置控制
    # while not MeterMap.global_map.get('update_complete', False):
    #   time.sleep(0.01)
    while True:
      # 不能清空队列,要不消息会丢失
      if MeterMap.global_map.get('Zb') == '0':
            parse_zb_0_msg(sa)
      else:
            parse_zb_1_msg(sa)

@logger.catch
def judge_context(que, sa):
    while True:
      try:
            zb, data = TEXT_QUEUE.get()
            data = data.upper()
            if zb == '0':
                # logger.info(f"取出队列中的数据{data}")
                data = data
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                engine = ParseMsgEngine(hex_message=data, sa=sa)
                # 负责解析
                engine.start_parse(hex_message=data, is_send=True)


                # 注释下面两行代码就可以
                # is_true = engine.start_parse(hex_message=data)
                # if is_true:
                #   data = engine.cal_msg()
                #   if data:
                #         sa.ser.write(data)
                #         logger.info(f"send:{data}")

                logger.info("数据处理结束")
            else:
                # logger.info(f"取出队列中的数据{data}")
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                data_list = []
                while len(data) > 0:
                  data_len = int((data + data), 16) * 2
                  data_index = data[:data_len]
                  data = data
                  data_list.append(data_index)
                for item_data in data_list:
                  data = item_data

                  # if data == '04' and data == 'F10100':
                  #   message_list = []
                  #   message = data
                  #
                  #   # 698报文解析
                  #   if data in ['00', '03']:
                  #         message_count = int(data.count('68') / 2)
                  #         for i in range(message_count):
                  #             end_index = message.find('16')
                  #             next_end_index = message.find('16', end_index + 2)
                  #             message_list.append(message[:next_end_index + 2])
                  #             print(message[:next_end_index + 2])
                  #             message = message
                  #             # 调用698
                  #         for item_msg in message_list:
                  #             p = ParseMsgEngine(hex_message=item_msg, sa=sa)
                  #             p.start_parse(hex_message=item_msg, is_send=True)
                  #
                  #   # 645报文解析
                  #   elif data == '02':
                  #         message_count = int((data.count('68') - 1) / 2)
                  #         start_index = data.find('68')
                  #         next_start_index = data.find('68', start_index + 2)
                  #         print(next_start_index)
                  #         message = data
                  #         print(message)
                  #         for i in range(message_count):
                  #             stop = message.find('16')
                  #             result = message[:stop + 2]
                  #             message = message
                  #             print(result)
                  #             message_list.append(result)
                  #         for item_msg in message_list:
                  #             p = ParseEngine_645(sa=sa)
                  #             p.start_parse(item_msg)
                  # hex_message_h
                  # data = data
                  p = ParseEngine3762(sa=sa)
                  p.start_parse(data)

                  # engine = ParseMsgEngine(hex_message=data)
                  # 负责解析
                  # is_true = engine.start_parse(hex_message=data)
                  # 新的引擎处理消息
                  # 解析出使用的协议
                  # 注释下面两行代码就可以
                  # if is_true:
                  #   data = engine.cal_msg()
                  #   if data:
                  #         sa.ser.write(data)
                  #         logger.info(f"send:{data}")
      except Exception as e:
            print(e)
            time.sleep(1)

@logger.catch
def main():
    with SerialAssistant(port="COM1", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
                         stopbits=serial.STOPBITS_ONE) as sa:
      time.sleep(1)
      serial_threading = threading.Thread(target=get_frame, args=(sa,))
      # judge_threading = threading.Thread(target=judge_context)
      serial_threading.start()
      # judge_threading.start()

      que = queue.Queue()
      judge_context(que, sa)


if __name__ == '__main__':
    main()

gufengxiaoxiao 发表于 2023-11-1 16:47:23

parse_zb_1_msg存在读取缓存区失败问题
页: [1]
查看完整版本: parse_zb_0_msg存在运行一段时间后就不再接收数据问题