糖醋咸鱼微辣 发表于 2024-8-8 00:28:58

请问这个脚本如何优化可以缩减代码执行耗费时间啊,才五十来个进程要六秒多才出结果

代码如下求大佬帮忙看看from json import load
from os import listdir, path
from subprocess import run
from sys import argv
from typing import List, Tuple

emt = Tuple

def func(root_dir: str) -> List:
    data = []
    proc_list =
   
    def get_proc_type(proc_path):
      with open(file=proc_path) as f:
            proc_type = load(f)["Processes"]["Configs"]["ProcType"]["name"]
      return proc_type
   
    def get_pid(proc):
      result = run(args=f"pidof {proc}", capture_output=True, shell=True, text=True)
      return result.stdout.strip() if result.returncode == 0 else "Null"
   
   
    for row_prefix, proc in enumerate(iterable=proc_list, start=1):
      proc_dir = path.join(root_dir, proc)
      if path.exists(path=(path_json := path.join(proc_dir, "etc", "exec_config.json"))):
            pid = get_pid(proc=proc)
            proc_type = get_proc_type(proc_path=path_json)
            proc_status = "Running" if pid != "Null" else "Failed"
            
            if path.exists(f"{proc_dir}/bin"):
                data.append()
            elif path.exists(f"{proc_dir}/bin_disable"):
                data.append()
    return data


def tabulated_printing(headers: emt, data: List) -> None:
    format_tabulation = []
    max_length = lambda cell: len(cell.encode("gb2312"))
    col_widths =

    def make_line_separator() -> str:
      return f"+{'+'.join('-' * (col_width + 2) for col_width in col_widths)}+"

    def format_row(row: List, color: str = "") -> str:
      formatted_row = " | ".join(
            f"{item:<{col_width - max_length(item) + len(item)}}"
            for item, col_width in zip(row, col_widths)
      )
      return f"| \033[{color}m{formatted_row}\033[0m |" if color else f"| {formatted_row} |"

    format_tabulation.extend()
    format_tabulation.extend([
      format_row(row=row, color="9" if row[-2] == "Null" else "") for row in data
    ])
    format_tabulation.append(make_line_separator())
    print("\n".join(format_tabulation))


if __name__ == '__main__':
    root_dir = None
    try:
      root_dir = argv
    except Exception as err:
      print(f"input dir error:{err}")
   
    headers = ["No.", "Proc", "Status", "Pid", "Type"]
    tabulated_printing(headers=headers, data=func(root_dir=root_dir))

FishC 发表于 2024-8-8 00:42:50

当然可以!让我们来看看这个脚本如何优化一下,以缩短执行时间。您提到有 50 多个进程要处理,6 秒的时间确实有点慢。以下是一些优化建议:

### 1. **优先使用并发执行**:
您的代码目前是串行处理每个进程,也就是说一个接一个地等待。我们可以使用 `concurrent.futures` 模块来实现并发处理,让计算机同时处理多个任务。

from json import load
from os import listdir, path
from subprocess import run
from sys import argv
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor

emt = Tuple

def func(root_dir: str) -> List:
    data = []
    proc_list = listdir(root_dir)

    def get_proc_type(proc_path):
      with open(file=proc_path) as f:
            return load(f)

    def get_pid(proc):
      result = run(args=f"pidof {proc}", capture_output=True, shell=True, text=True)
      return result.stdout.strip() if result.returncode == 0 else "Null"

    def process_proc(proc):
      proc_dir = path.join(root_dir, proc)
      if path.exists(path_json := path.join(proc_dir, "etc", "exec_config.json")):
            pid = get_pid(proc=proc)
            proc_type = get_proc_type(proc_path=path_json)
            proc_status = "Running" if pid != "Null" else "Failed"
            
            if path.exists(f"{proc_dir}/bin"):
                return
      return None

    with ThreadPoolExecutor() as executor:
      results = list(executor.map(process_proc, proc_list))
      
    for row_prefix, result in enumerate(results, start=1):
      if result:
            data.append( + result)

    return data

### 2. **减少 I/O 操作次数**:
目前代码在获取 `proc_type` 时打开文件,每个进程都会读取一次。这一部分可以通过增加缓存或使用更高效的数据结构来存储和重复使用数据。

### 3. **确认代码中的无用部分**:
仔细检查是否有必要的逻辑或不必要的任务,比如检查文件是否存在的频率。

### 4. **使用更快的方法获取 PID**:
如果你的系统支持,可以考虑使用 `/proc` 文件系统直接读取 PID 信息,而不是使用 `pidof` 命令,这样会更快。

### 5. **减少对外部命令的依赖**:
尽量使用 Python 内置的功能来完成操作,而不是调用外部命令,这样会更高效。

以上这些修改能够显著提高性能,特别是在需要处理大量进程时。记得测试优化后的代码以确保它仍然按预期工作哦!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

糖醋咸鱼微辣 发表于 2024-8-8 11:35:05

FishC 发表于 2024-8-8 00:42
当然可以!让我们来看看这个脚本如何优化一下,以缩短执行时间。您提到有 50 多个进程要处理,6 秒的时间确 ...

concurrent是第三方库,运行不支持啊,请问你后面描述的2、4、5的优化方式具体代码是什么?(脚本是在ubuntu20.04系统环境下运行的)

FishC 发表于 2024-8-8 11:40:28

糖醋咸鱼微辣 发表于 2024-08-08 11:35
concurrent是第三方库,运行不支持啊,请问你后面描述 ...

好的,没有问题!在不使用 `concurrent.futures` 的情况下,我们可以通过降低 I/O 访问频率和避免外部命令来优化您的代码。以下是针对您提到的第 2、4、5 点的具体实现方式:

### 2. **减少 I/O 操作次数**
我们可以将读取 JSON 文件的结果缓存起来,以便多次使用,避免重复读取文件:

def func(root_dir: str) -> List:
    data = []
    proc_list = listdir(root_dir)
    proc_cache = {}# 缓存已读取的 proc_type

    def get_proc_type(proc_path):
      if proc_path in proc_cache:# 检查缓存
            return proc_cache
      
      with open(proc_path) as f:
            proc_type = load(f)
            proc_cache = proc_type# 存入缓存
      return proc_type

### 4. **使用更快的方法获取 PID**
我们可以直接访问 `/proc` 文件系统获取进程的 PID。这里是一个示例代码,以检查进程是否在运行:

import os

def get_pid(proc):
    # scan through /proc directory to find the pid
    for pid in os.listdir('/proc'):
      if pid.isdigit():
            try:
                with open(os.path.join('/proc', pid, 'cmdline'), 'r') as f:
                  cmdline = f.read()
                  if proc in cmdline:
                        return pid# 返回找到的PID
            except IOError:# ignore processes that exit while we're looking at pid
                continue
    return "Null"

### 5. **减少对外部命令的依赖**
使用上述 `get_pid` 函数取代原来的 `pidof` 命令,这样就不需要调用 shell 命令了。

### 优化后的完整代码示例
结合这些改进,优化后的 `func` 函数应该如下所示:

from json import load
from os import listdir, path
from typing import List, Tuple

emt = Tuple

def func(root_dir: str) -> List:
    data = []
    proc_list = listdir(root_dir)
    proc_cache = {}

    def get_proc_type(proc_path):
      if proc_path in proc_cache:
            return proc_cache
      
      with open(proc_path) as f:
            proc_type = load(f)
            proc_cache = proc_type
      return proc_type

    def get_pid(proc):
      for pid in listdir('/proc'):
            if pid.isdigit():
                try:
                  with open(os.path.join('/proc', pid, 'cmdline'), 'r') as f:
                        cmdline = f.read()
                        if proc in cmdline:
                            return pid
                except IOError:
                  continue
      return "Null"

    for proc in proc_list:
      proc_dir = path.join(root_dir, proc)
      if path.exists(path_json := path.join(proc_dir, "etc", "exec_config.json")):
            pid = get_pid(proc=proc)
            proc_type = get_proc_type(proc_path=path_json)
            proc_status = "Running" if pid != "Null" else "Failed"
            
            if path.exists(f"{proc_dir}/bin"):
                data.append((proc, proc_status, proc_type))# 修改以确保此行有效
            elif path.exists(f"{proc_dir}/bin_disable"):
                data.append((proc, "Disabled", proc_type))# 同上
    return data

通过这种方式,您可以减少 I/O 操作的次数,同时还能够提高 PID 获取的效率。这样应该可以显著减小脚本的执行时间!希望这能帮助到您!如果还有其他问题,请随时告诉我!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
页: [1]
查看完整版本: 请问这个脚本如何优化可以缩减代码执行耗费时间啊,才五十来个进程要六秒多才出结果