鱼C论坛

 找回密码
 立即注册
查看: 861|回复: 0

python串口交互问题

[复制链接]
发表于 2023-11-1 16:53:22 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
import queue
import socket
import time
import re
import serial.tools.list_ports
import threading
import queue
from loguru import logger

# 全局变量,用于存储串口对象
from Hplc import ParseEngine3762
from Parse_698 import ParseMsgEngine
from Parse645 import ParseEngine_645
import Sockapi
# from pymongo import MongoClient

import MeterMap
import SerialConfigMap

ser = None  # 这个空值

running = False
from loguru import logger
import datetime
import os



class Rotator:
    @logger.catch
    def __init__(self, *, size, at):
        now = datetime.datetime.now()
        self._size_limit = size
        self._time_limit = now.replace(hour=at.hour, minute=at.minute, second=at.second)

        if now >= self._time_limit:
            # 当前时间已经过了目标时间,为了防止立即进行旋转,增加一天
            self._time_limit += datetime.timedelta(days=1)

    @logger.catch
    def should_rotate(self, message, file):
        file.seek(0, 2)
        if file.tell() + len(message) > self._size_limit:
            return True
        if message.record["time"].timestamp() > self._time_limit.timestamp():
            self._time_limit += datetime.timedelta(days=1)
            return True
        return False

# 获取当前文件夹路径
current_dir = os.path.dirname(os.path.realpath(__file__))

# 创建日志文件夹路径
log_folder_path = os.path.join(current_dir, "logs")

# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder_path):
    os.makedirs(log_folder_path)

# 旋转文件大小超过 500MB 或每天午夜旋转一次
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# 生成日志文件路径
log_file_path = os.path.join(log_folder_path, f"file_{current_time}.log")



# log_file_path = f"E:\\python\\PycharmProjects\\C_demo_two\\C_demo_two\\file_{current_time}.log"

rotator = Rotator(size=5e+8, at=datetime.time(23, 59, 59))
logger.add(log_file_path, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", rotation=rotator.should_rotate)



class SerialAssistant:
    @logger.catch
    def __init__(self, port, baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
                 stopbits=serial.STOPBITS_ONE, timeout=10, rtscts=False, *args, **kwargs):
        self.port = port
        self.ser = serial.Serial(port=None, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits,
                                 timeout=timeout, rtscts=False)

    def tcp_connect(self):
        pass

    def db_connect(self):
        pass


    # @logger.catch
    def open(self):
        self.ser.port = self.port

        self.ser.is_open = True
        if self.ser.isOpen():
            self.ser.close()
        if self.ser._port is not None and not self.ser.is_open:
            self.ser.open()
            self.ser.set_buffer_size(rx_size=4096, tx_size=4096)
        put_ip = "10.108.2.101"
        put_port = "6666"
        put_ctime = "172800"
        self.sockapi = Sockapi.GetMac()
        self.sockapi.ConnectDevice(put_ip, put_port, put_ctime)
        self.tcp_connect = Sockapi.Tcp_api()
        client_socket  = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp_connect.set_client_socket(client_socket)
        server_address = ('10.17.166.199', 8888)
        self.tcp_connect.client_socket.connect(server_address)
        print('成功建立TCP连接')



    @logger.catch
    def close(self):
        try:
            self.ser.close()
        except Exception as e:
            raise e
        # if self.ser.isOpen():
        #     self.ser.close()

    @logger.catch
    def __enter__(self):
        self.ser.open()
        return self

    @logger.catch
    def __exit__(self, *args, **kwargs):
        self.ser.close()

all_queu = queue.Queue()
TEXT_QUEUE = queue.Queue(10000)  # 最大的承载消息10000条

@logger.catch
def parse_zb_0_msg(sa):
    try:
        current_str = ''  # 当前接收的数据
        current_wait_time = 0  # 当前累计等待的时间
        every_wait_time = 0.1  # 每次等待的时间
        last_index = 0  # 上次的索引
        find_text = 'fefefefe'  # 索引的数据
        find_text_length = len(find_text)  # 索引数据的长度
        find_start = 0  # 查询的起始位置
        max_wait_time = 0.5  # 最大等待时间15s 可以改

        while MeterMap.global_map.get('Zb') == '0':
            # 每次休眠0.2秒
            time.sleep(every_wait_time)
            num = sa.ser.inWaiting()
            # 如果没有数据,跳过本次循环
            if num > 0:
                # sleep会差生影响
                if MeterMap.global_map.get('Zb') != '0':
                    break
                # 读取数据
                data = sa.ser.read(num)
            else:
                data = b''
            # 将数据累加到current_str
            current_str += data.hex()
            # 如果数据没有达到指定长度继续读取
            if len(current_str) < find_text_length:
                # if current_str:
                #     logger.info(f"数据未达到最小的长度{find_text_length},当前数据为{current_str}")
                # else:
                #     logger.info(f"数据未达到最小的长度{find_text_length},当前数据为空!")
                continue
            else:
                # 查询索引值
                current_index = current_str.find(find_text, find_start)
                # 如果当前索引值不等于-1 代表找到了
                if current_index != -1:
                    # 当前索引值减去上次索引值如果大于等于find_text_length
                    if current_index - last_index >= find_text_length:
                        # 切除找出的子串
                        sub_text = current_str[0:current_index]
                        # 如果没有以指定的开头说明存在脏数据,没有必要加到队列中
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info('错误数据,没有以{find_text}开头--->', sub_text)
                        # current_str 重新赋值
                        current_str = current_str[current_index:]
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                    else:
                        last_index = current_index
                        # 查询的起始位置 加上find_text的长度,为了索引下次
                        find_start += len(find_text)
                else:
                    # 第一次等待最大等待15秒,就强制处理数据
                    if find_start > 0 and current_wait_time > max_wait_time:
                        sub_text = current_str[last_index:]
                        # logger.info(f"等待超过{max_wait_time}s 强制切割数据")
                        # 如果sub_text 与current_str 不相等说明存在一部分脏数据,切除
                        if sub_text != current_str:
                            logger.info(f"存在部分错误数据{current_index[:last_index]} 已被切除!")
                        # 数据的开头正确,加入到队列
                        if sub_text.startswith(find_text):
                            TEXT_QUEUE.put(('0', sub_text))
                        else:
                            logger.info(f'程序可能出错,强制切割数据是错误数据,没有以{find_text}开头--->{sub_text}')
                        # current_str 置空
                        current_str = ''
                        # 查询的起始位置 置零
                        find_start = 0
                        # 索引置零
                        last_index = 0
                        # 等待时间置零
                        current_wait_time = 0
                    elif find_start <= 0 and current_wait_time > max_wait_time:
                        print(f'当前数据为{current_str}超过{max_wait_time}s 就进行强制切割,将数据扔掉!')
                        current_str = ''
                    else:
                        pass
                        # print(f'当前等待时间为{current_wait_time}s')
                # 每次增加0.2秒等待时间
                current_wait_time += every_wait_time

    except Exception as e:
        logger.error("发生了异常:", e)

@logger.catch
def parse_zb_1_msg(sa):
    while MeterMap.global_map.get('Zb') != '0':
        time.sleep(0.1)
        count = 3
        current_data = ''
        while count > 0:
            num = sa.ser.inWaiting()
            if num > 0:
                data = sa.ser.read(num)
                current_data += data.hex()
            time.sleep(0.1)
            count -= 1
        data = current_data
        if data:
            if data[:2] == '68':
                data_len = int((data[4:6] + data[2:4]), 16) * 2
                if len(data) < data_len:
                    next_num = data_len - len(data)
                    if num >= next_num:
                        data_next = sa.ser.read(next_num)
                        data_next = data_next.hex()
                        data = data + data_next

        if len(data) > 16:
            TEXT_QUEUE.put(('1', data))
        else:
            logger.debug("")


@logger.catch
def get_frame(sa):
    while True:
        # 不能清空队列,要不消息会丢失
        if MeterMap.global_map.get('Zb') == '0':
            parse_zb_0_msg(sa)
        else:
            parse_zb_1_msg(sa)

@logger.catch
def judge_context(que, sa):
    while True:
        try:
            zb, data = TEXT_QUEUE.get()
            data = data.upper()
            if zb == '0':
                # logger.info(f"取出队列中的数据{data}")
                data = data[8:]
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                engine = ParseMsgEngine(hex_message=data, sa=sa)
                # 负责解析
                engine.start_parse(hex_message=data, is_send=True)
                logger.info("数据处理结束")
            else:
                # logger.info(f"取出队列中的数据{data}")
                # logger.info("开始处理数据")
                logger.info(f"recive:{data}")
                data_list = []
                while len(data) > 0:
                    data_len = int((data[4:6] + data[2:4]), 16) * 2
                    data_index = data[:data_len]
                    data = data[data_len:]
                    data_list.append(data_index)
                for item_data in data_list:
                    data = item_data
                    p = ParseEngine3762(sa=sa)
                    p.start_parse(data)
        except Exception as e:
            print(e)
            time.sleep(1)

@logger.catch
def main():
    with SerialAssistant(port="COM1", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN,
                         stopbits=serial.STOPBITS_ONE) as sa:
        time.sleep(1)
        serial_threading = threading.Thread(target=get_frame, args=(sa,))
        # judge_threading = threading.Thread(target=judge_context)
        serial_threading.start()
        # judge_threading.start()

        que = queue.Queue()
        judge_context(que, sa)


if __name__ == '__main__':
    main()
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-11-15 01:52

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表