鱼C论坛

 找回密码
 立即注册
查看: 625|回复: 1

parse_zb_0_msg存在运行一段时间后就不再接收数据问题

[复制链接]
发表于 2023-11-1 16:43:42 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
import queue
import socket
import time
import re
import serial.tools.list_ports
import threading
import queue
from loguru import logger

# 全局变量,用于存储串口对象
from Hplc import ParseEngine3762
from Parse_698 import ParseMsgEngine
from Parse645 import ParseEngine_645
import Sockapi
# from pymongo import MongoClient

import MeterMap
import SerialConfigMap

ser = None  # 这个空值

running = False
from loguru import logger
import datetime
import os



class Rotator:
    @logger.catch
    def __init__(self, *, size, at):
        now = datetime.datetime.now()
        self._size_limit = size
        self._time_limit = now.replace(hour=at.hour, minute=at.minute, second=at.second)

        if now >= self._time_limit:
            # 当前时间已经过了目标时间,为了防止立即进行旋转,增加一天
            self._time_limit += datetime.timedelta(days=1)

    @logger.catch
    def should_rotate(self, message, file):
        file.seek(0, 2)
        if file.tell() + len(message) > self._size_limit:
            return True
        if message.record["time"].timestamp() > self._time_limit.timestamp():
            self._time_limit += datetime.timedelta(days=1)
            return True
        return False

# 获取当前文件夹路径
current_dir = os.path.dirname(os.path.realpath(__file__))

# 创建日志文件夹路径
log_folder_path = os.path.join(current_dir, "logs")

# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder_path):
    os.makedirs(log_folder_path)

# 旋转文件大小超过 500MB 或每天午夜旋转一次
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 生成日志文件路径
log_file_path = os.path.join(log_folder_path, f"file_{current_time}.log")



# log_file_path = f"E:\\python\\PycharmProjects\\C_demo_two\\C_demo_two\\file_{current_time}.log"

rotator = Rotator(size=5e+8, at=datetime.time(23, 59, 59))
logger.add(log_file_path, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation=rotator.should_rotate)



class SerialAssistant:
    @logger.catch
    def __init__(self, port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
                 stopbits=serial.STOPBITS_ONE, timeout=10, rtscts=False, *args, **kwargs):
        self.port = port
        self.ser = serial.Serial(port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits,
                                 timeout=timeout, rtscts=False)

    def tcp_connect(self):
        pass

    def db_connect(self):
        pass


    # @logger.catch
    def open(self):
        self.ser.port = self.port

        self.ser.is_open = True
        if self.ser.isOpen():
            self.ser.close()
        if self.ser._port is not None and not self.ser.is_open:
            self.ser.open()
            self.ser.set_buffer_size(rx_size=4096, tx_size=4096)
        put_ip = "10.108.2.101"
        put_port = "6666"
        put_ctime = "172800"
        self.sockapi = Sockapi.GetMac()
        self.sockapi.ConnectDevice(put_ip, put_port, put_ctime)

        self.tcp_connect = Sockapi.Tcp_api()
        client_socket  = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp_connect.set_client_socket(client_socket)
        server_address = ('10.17.166.199', 8888)
        # server_address = ('192.168.127.126', 8888)
        self.tcp_connect.client_socket.connect(server_address)
        # client = MongoClient('mongodb://10.17.166.199:27017')


        # s1=time.time()
        print('成功建立TCP连接')
        # # s2 = time.time()
        # print(s2-s1)


    @logger.catch
    def close(self):
        try:
            self.ser.close()
        except Exception as e:
            raise e
        # if self.ser.isOpen():
        #     self.ser.close()

    @logger.catch
    def __enter__(self):
        self.ser.open()
        return self

    @logger.catch
    def __exit__(self, *args, **kwargs):
        self.ser.close()

all_queu = queue.Queue()
TEXT_QUEUE = queue.Queue(10000)  # 最大的承载消息10000条

@logger.catch
def parse_zb_0_msg(sa):
    try:
        current_str = ''  # 当前接收的数据
        current_wait_time = 0  # 当前累计等待的时间
        every_wait_time = 0.1  # 每次等待的时间
        last_index = 0  # 上次的索引
        find_text = 'fefefefe'  # 索引的数据
        find_text_length = len(find_text)  # 索引数据的长度
        find_start = 0  # 查询的起始位置
        max_wait_time = 0.5  # 最大等待时间15s 可以改

        while MeterMap.global_map.get('Zb') == '0':
            # 每次休眠0.2秒
            time.sleep(every_wait_time)
            num = sa.ser.inWaiting()
            # 如果没有数据,跳过本次循环
            if num > 0:
                # sleep会差生影响
                if MeterMap.global_map.get('Zb') != '0':
                    break
                # 读取数据
                data = sa.ser.read(num)
            else:
                data = b''
            # 将数据累加到current_str
            current_str += data.hex()
            # 如果数据没有达到指定长度继续读取
            if len(current_str) < find_text_length:
                # if current_str:
                #     logger.info(f"数据未达到最小的长度{find_text_length},当前数据为{current_str}")
                # else:
                #     logger.info(f"数据未达到最小的长度{find_text_length},当前数据为空!")
                continue
            else:
                # 查询索引值
                current_index = current_str.find(find_text, find_start)
                # 如果当前索引值不等于-1 代表找到了
                if current_index != -1:
                    # 当前索引值减去上次索引值如果大于等于find_text_length
                    if current_index - last_index >= find_text_length:
                        # 切除找出的子串
                        sub_text = current_str[0:current_index]
                        # 如果没有以指定的开头说明存在脏数据,没有必要加到队列中
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info('错误数据,没有以{find_text}开头--->', sub_text)
                        # current_str 重新赋值
                        current_str = current_str[current_index:]
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                    else:
                        last_index = current_index
                        # 查询的起始位置 加上find_text的长度,为了索引下次
                        find_start += len(find_text)
                else:
                    # 第一次等待最大等待15秒,就强制处理数据
                    if find_start > 0 and current_wait_time > max_wait_time:
                        sub_text = current_str[last_index:]
                        logger.info(f"等待超过{max_wait_time}s 强制切割数据")
                        # 如果sub_text 与current_str 不相等说明存在一部分脏数据,切除
                        if sub_text != current_str:
                            logger.info(f"存在部分错误数据{current_index[:last_index]} 已被切除!")
                        # 数据的开头正确,加入到队列
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info(f'程序可能出错,强制切割数据是错误数据,没有以{find_text}开头--->{sub_text}')
                        # current_str 置空
                        current_str = ''
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                    elif find_start <= 0 and current_wait_time > max_wait_time:
                        print(f'当前数据为{current_str}超过{max_wait_time}s 就进行强制切割,将数据扔掉!')
                        current_str = ''
                    else:

                        print(f'当前等待时间为{current_wait_time}s')
                # 每次增加0.2秒等待时间
                current_wait_time += every_wait_time

    except Exception as e:
        logger.error("发生了异常:", e)

@logger.catch
def parse_zb_1_msg(sa):
    while MeterMap.global_map.get('Zb') != '0':
        data = ''
        # num = sa.ser.inWaiting()
        # if num > 0:
        #     data = sa.ser.read(num)
        #     data = data.hex()
        # if num > 0:
        #     data_next = sa.ser.read(num)
        #     data_next = data_next.hex()
        #     data = data + data_next
        # else:
        #     time.sleep(0.05)
        #     data_next = sa.ser.read(num)
        #     data_next = data_next.hex()
        #     data = data + data_next
        time.sleep(0.1)
        count = 3
        current_data = ''
        while count > 0:
            num = sa.ser.inWaiting()
            if num > 0:
                data = sa.ser.read(num)
                current_data += data.hex()
            time.sleep(0.1)
            count -= 1
        data = current_data
        if data:
            if data[:2] == '68':
                data_len = int((data[4:6] + data[2:4]), 16) * 2
                if len(data) < data_len:
                    next_num = data_len - len(data)
                    if num >= next_num:
                        data_next = sa.ser.read(next_num)
                        data_next = data_next.hex()
                        data = data + data_next
                # elif len(data) > data_len:
                #     data = data[:data_len]
                #     data_sencond = MeterMap.global_map['data_sencond']
        # time.sleep(0.05)  # 每隔50毫秒执行一次循环
        #
        # if sa.ser.inWaiting() > 0:  # 检查串口缓存区是否有数据
        #     data = sa.ser.read_all()  # 读取全部缓存区数据
        #     data = data.hex()
        #
        #     if data[:2] == '68':  # 判断数据开头是否为 '68'
        #         data_len = int((data[4:6] + data[2:4]), 16) * 2
        #
        #         if len(data) < data_len:  # 数据长度小于预期长度
        #             data_next_len = data_len - len(data)
        #             while sa.ser.inWaiting() < data_next_len:  # 持续检查串口缓存区大小
        #                 time.sleep(0.01)
        #             if sa.ser.inWaiting() >= data_next_len:  # 再次检查缓存区大小是否满足条件
        #                 data_next = sa.ser.read(data_next_len)  # 取出补充数据
        #                 data += data_next
        if len(data) > 16:
            TEXT_QUEUE.put(('1', data))
        else:
            pass

        # num = sa.ser.inWaiting()
        # if num > 0:
        #     data = sa.ser.read(num)
        #     data = data.hex()
        #     # data_sencond = MeterMap.global_map.get('data_sencond') or []
        #     # data = data_sencond + data

@logger.catch
def get_frame(sa):
    #时间更新脚本前置控制
    # while not MeterMap.global_map.get('update_complete', False):
    #     time.sleep(0.01)
    while True:
        # 不能清空队列,要不消息会丢失
        if MeterMap.global_map.get('Zb') == '0':
            parse_zb_0_msg(sa)
        else:
            parse_zb_1_msg(sa)

@logger.catch
def judge_context(que, sa):
    while True:
        try:
            zb, data = TEXT_QUEUE.get()
            data = data.upper()
            if zb == '0':
                # logger.info(f"取出队列中的数据{data}")
                data = data[8:]
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                engine = ParseMsgEngine(hex_message=data, sa=sa)
                # 负责解析
                engine.start_parse(hex_message=data, is_send=True)


                # 注释下面两行代码就可以
                # is_true = engine.start_parse(hex_message=data)
                # if is_true:
                #     data = engine.cal_msg()
                #     if data:
                #         sa.ser.write(data)
                #         logger.info(f"send:{data}")

                logger.info("数据处理结束")
            else:
                # logger.info(f"取出队列中的数据{data}")
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                data_list = []
                while len(data) > 0:
                    data_len = int((data[4:6] + data[2:4]), 16) * 2
                    data_index = data[:data_len]
                    data = data[data_len:]
                    data_list.append(data_index)
                for item_data in data_list:
                    data = item_data

                    # if data[8:10] == '04' and data[44:50] == 'F10100':
                    #     message_list = []
                    #     message = data
                    #
                    #     # 698报文解析
                    #     if data[50:52] in ['00', '03']:
                    #         message_count = int(data.count('68') / 2)
                    #         for i in range(message_count):
                    #             end_index = message.find('16')
                    #             next_end_index = message.find('16', end_index + 2)
                    #             message_list.append(message[:next_end_index + 2])
                    #             print(message[:next_end_index + 2])
                    #             message = message[next_end_index + 2:]
                    #             # 调用698
                    #         for item_msg in message_list:
                    #             p = ParseMsgEngine(hex_message=item_msg, sa=sa)
                    #             p.start_parse(hex_message=item_msg, is_send=True)
                    #
                    #     # 645报文解析
                    #     elif data[50:52] == '02':
                    #         message_count = int((data.count('68') - 1) / 2)
                    #         start_index = data.find('68')
                    #         next_start_index = data.find('68', start_index + 2)
                    #         print(next_start_index)
                    #         message = data[next_start_index: -4]
                    #         print(message)
                    #         for i in range(message_count):
                    #             stop = message.find('16')
                    #             result = message[:stop + 2]
                    #             message = message[stop + 2:]
                    #             print(result)
                    #             message_list.append(result)
                    #         for item_msg in message_list:
                    #             p = ParseEngine_645(sa=sa)
                    #             p.start_parse(item_msg)
                    # hex_message_h
                    # data = data[58:-4]
                    p = ParseEngine3762(sa=sa)
                    p.start_parse(data)

                    # engine = ParseMsgEngine(hex_message=data)
                    # 负责解析
                    # is_true = engine.start_parse(hex_message=data)
                    # 新的引擎处理消息
                    # 解析出使用的协议
                    # 注释下面两行代码就可以
                    # if is_true:
                    #     data = engine.cal_msg()
                    #     if data:
                    #         sa.ser.write(data)
                    #         logger.info(f"send:{data}")
        except Exception as e:
            print(e)
            time.sleep(1)

@logger.catch
def main():
    with SerialAssistant(port="COM1", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
                         stopbits=serial.STOPBITS_ONE) as sa:
        time.sleep(1)
        serial_threading = threading.Thread(target=get_frame, args=(sa,))
        # judge_threading = threading.Thread(target=judge_context)
        serial_threading.start()
        # judge_threading.start()

        que = queue.Queue()
        judge_context(que, sa)


if __name__ == '__main__':
    main()
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

 楼主| 发表于 2023-11-1 16:47:23 | 显示全部楼层
parse_zb_1_msg存在读取缓存区失败问题
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-9-21 13:32

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表