鱼C论坛

 找回密码
 立即注册
查看: 372|回复: 3

请问这个脚本如何优化可以缩减代码执行耗费时间啊,才五十来个进程要六秒多才出结果

[复制链接]
发表于 2024-8-8 00:28:58 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
代码如下  求大佬帮忙看看
  1. from json import load
  2. from os import listdir, path
  3. from subprocess import run
  4. from sys import argv
  5. from typing import List, Tuple

  6. emt = Tuple[str, str, str, str, str]

  7. def func(root_dir: str) -> List[emt]:
  8.     data = []
  9.     proc_list = [_ for _ in listdir(path=root_dir) if path.isdir(path.join(root_dir, _))]
  10.    
  11.     def get_proc_type(proc_path):
  12.         with open(file=proc_path) as f:
  13.             proc_type = load(f)["Processes"][0]["Configs"][0]["ProcType"][0]["name"]
  14.         return proc_type
  15.    
  16.     def get_pid(proc):
  17.         result = run(args=f"pidof {proc}", capture_output=True, shell=True, text=True)
  18.         return result.stdout.strip() if result.returncode == 0 else "Null"
  19.    
  20.    
  21.     for row_prefix, proc in enumerate(iterable=proc_list, start=1):
  22.         proc_dir = path.join(root_dir, proc)
  23.         if path.exists(path=(path_json := path.join(proc_dir, "etc", "exec_config.json"))):
  24.             pid = get_pid(proc=proc)
  25.             proc_type = get_proc_type(proc_path=path_json)
  26.             proc_status = "Running" if pid != "Null" else "Failed"
  27.             
  28.             if path.exists(f"{proc_dir}/bin"):
  29.                 data.append([str(row_prefix), proc, f"{proc_status:<12}", pid, proc_type])
  30.             elif path.exists(f"{proc_dir}/bin_disable"):
  31.                 data.append([str(row_prefix), proc, "Disable", "", proc_type])
  32.     return data


  33. def tabulated_printing(headers: emt, data: List[emt]) -> None:
  34.     format_tabulation = []
  35.     max_length = lambda cell: len(cell.encode("gb2312"))
  36.     col_widths = [max(max_length(str(cell)) for cell in column) for column in zip(headers, *data)]

  37.     def make_line_separator() -> str:
  38.         return f"+{'+'.join('-' * (col_width + 2) for col_width in col_widths)}+"

  39.     def format_row(row: List[str], color: str = "") -> str:
  40.         formatted_row = " | ".join(
  41.             f"{item:<{col_width - max_length(item) + len(item)}}"
  42.             for item, col_width in zip(row, col_widths)
  43.         )
  44.         return f"| \033[{color}m{formatted_row}\033[0m |" if color else f"| {formatted_row} |"

  45.     format_tabulation.extend([make_line_separator(), format_row(row=headers), make_line_separator()])
  46.     format_tabulation.extend([
  47.         format_row(row=row, color="9" if row[-2] == "Null" else "") for row in data
  48.     ])
  49.     format_tabulation.append(make_line_separator())
  50.     print("\n".join(format_tabulation))


  51. if __name__ == '__main__':
  52.     root_dir = None
  53.     try:
  54.         root_dir = argv[1]
  55.     except Exception as err:
  56.         print(f"input dir error:{err}")
  57.    
  58.     headers = ["No.", "Proc", "Status", "Pid", "Type"]
  59.     tabulated_printing(headers=headers, data=func(root_dir=root_dir))
复制代码
小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

发表于 2024-8-8 00:42:50 | 显示全部楼层
当然可以!让我们来看看这个脚本如何优化一下,以缩短执行时间。您提到有 50 多个进程要处理,6 秒的时间确实有点慢。以下是一些优化建议:

### 1. **优先使用并发执行**:
您的代码目前是串行处理每个进程,也就是说一个接一个地等待。我们可以使用 `concurrent.futures` 模块来实现并发处理,让计算机同时处理多个任务。

  1. from json import load
  2. from os import listdir, path
  3. from subprocess import run
  4. from sys import argv
  5. from typing import List, Tuple
  6. from concurrent.futures import ThreadPoolExecutor

  7. emt = Tuple

  8. def func(root_dir: str) -> List:
  9.     data = []
  10.     proc_list = listdir(root_dir)

  11.     def get_proc_type(proc_path):
  12.         with open(file=proc_path) as f:
  13.             return load(f)

  14.     def get_pid(proc):
  15.         result = run(args=f"pidof {proc}", capture_output=True, shell=True, text=True)
  16.         return result.stdout.strip() if result.returncode == 0 else "Null"

  17.     def process_proc(proc):
  18.         proc_dir = path.join(root_dir, proc)
  19.         if path.exists(path_json := path.join(proc_dir, "etc", "exec_config.json")):
  20.             pid = get_pid(proc=proc)
  21.             proc_type = get_proc_type(proc_path=path_json)
  22.             proc_status = "Running" if pid != "Null" else "Failed"
  23.             
  24.             if path.exists(f"{proc_dir}/bin"):
  25.                 return [proc, f"{proc_status}"]
  26.         return None

  27.     with ThreadPoolExecutor() as executor:
  28.         results = list(executor.map(process_proc, proc_list))
  29.         
  30.     for row_prefix, result in enumerate(results, start=1):
  31.         if result:
  32.             data.append([str(row_prefix)] + result)

  33.     return data
复制代码


### 2. **减少 I/O 操作次数**:
目前代码在获取 `proc_type` 时打开文件,每个进程都会读取一次。这一部分可以通过增加缓存或使用更高效的数据结构来存储和重复使用数据。

### 3. **确认代码中的无用部分**:
仔细检查是否有必要的逻辑或不必要的任务,比如检查文件是否存在的频率。

### 4. **使用更快的方法获取 PID**:
如果你的系统支持,可以考虑使用 `/proc` 文件系统直接读取 PID 信息,而不是使用 `pidof` 命令,这样会更快。

### 5. **减少对外部命令的依赖**:
尽量使用 Python 内置的功能来完成操作,而不是调用外部命令,这样会更高效。

以上这些修改能够显著提高性能,特别是在需要处理大量进程时。记得测试优化后的代码以确保它仍然按预期工作哦!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2024-8-8 11:35:05 From FishC Mobile | 显示全部楼层
FishC 发表于 2024-8-8 00:42
当然可以!让我们来看看这个脚本如何优化一下,以缩短执行时间。您提到有 50 多个进程要处理,6 秒的时间确 ...

concurrent是第三方库,运行不支持啊,请问你后面描述的2、4、5的优化方式具体代码是什么?(脚本是在ubuntu20.04系统环境下运行的)
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2024-8-8 11:40:28 | 显示全部楼层
糖醋咸鱼微辣 发表于 2024-08-08 11:35
concurrent是第三方库,运行不支持啊,请问你后面描述 ...


好的,没有问题!在不使用 `concurrent.futures` 的情况下,我们可以通过降低 I/O 访问频率和避免外部命令来优化您的代码。以下是针对您提到的第 2、4、5 点的具体实现方式:

### 2. **减少 I/O 操作次数**
我们可以将读取 JSON 文件的结果缓存起来,以便多次使用,避免重复读取文件:

  1. def func(root_dir: str) -> List:
  2.     data = []
  3.     proc_list = listdir(root_dir)
  4.     proc_cache = {}  # 缓存已读取的 proc_type

  5.     def get_proc_type(proc_path):
  6.         if proc_path in proc_cache:  # 检查缓存
  7.             return proc_cache[proc_path]
  8.         
  9.         with open(proc_path) as f:
  10.             proc_type = load(f)
  11.             proc_cache[proc_path] = proc_type  # 存入缓存
  12.         return proc_type
复制代码


### 4. **使用更快的方法获取 PID**
我们可以直接访问 `/proc` 文件系统获取进程的 PID。这里是一个示例代码,以检查进程是否在运行:

  1. import os

  2. def get_pid(proc):
  3.     # scan through /proc directory to find the pid
  4.     for pid in os.listdir('/proc'):
  5.         if pid.isdigit():
  6.             try:
  7.                 with open(os.path.join('/proc', pid, 'cmdline'), 'r') as f:
  8.                     cmdline = f.read()
  9.                     if proc in cmdline:
  10.                         return pid  # 返回找到的PID
  11.             except IOError:  # ignore processes that exit while we're looking at pid
  12.                 continue
  13.     return "Null"
复制代码


### 5. **减少对外部命令的依赖**
使用上述 `get_pid` 函数取代原来的 `pidof` 命令,这样就不需要调用 shell 命令了。

### 优化后的完整代码示例
结合这些改进,优化后的 `func` 函数应该如下所示:

  1. from json import load
  2. from os import listdir, path
  3. from typing import List, Tuple

  4. emt = Tuple

  5. def func(root_dir: str) -> List:
  6.     data = []
  7.     proc_list = listdir(root_dir)
  8.     proc_cache = {}

  9.     def get_proc_type(proc_path):
  10.         if proc_path in proc_cache:
  11.             return proc_cache[proc_path]
  12.         
  13.         with open(proc_path) as f:
  14.             proc_type = load(f)
  15.             proc_cache[proc_path] = proc_type
  16.         return proc_type

  17.     def get_pid(proc):
  18.         for pid in listdir('/proc'):
  19.             if pid.isdigit():
  20.                 try:
  21.                     with open(os.path.join('/proc', pid, 'cmdline'), 'r') as f:
  22.                         cmdline = f.read()
  23.                         if proc in cmdline:
  24.                             return pid
  25.                 except IOError:
  26.                     continue
  27.         return "Null"

  28.     for proc in proc_list:
  29.         proc_dir = path.join(root_dir, proc)
  30.         if path.exists(path_json := path.join(proc_dir, "etc", "exec_config.json")):
  31.             pid = get_pid(proc=proc)
  32.             proc_type = get_proc_type(proc_path=path_json)
  33.             proc_status = "Running" if pid != "Null" else "Failed"
  34.             
  35.             if path.exists(f"{proc_dir}/bin"):
  36.                 data.append((proc, proc_status, proc_type))  # 修改以确保此行有效
  37.             elif path.exists(f"{proc_dir}/bin_disable"):
  38.                 data.append((proc, "Disabled", proc_type))  # 同上
  39.     return data
复制代码


通过这种方式,您可以减少 I/O 操作的次数,同时还能够提高 PID 获取的效率。这样应该可以显著减小脚本的执行时间!希望这能帮助到您!如果还有其他问题,请随时告诉我!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-4-21 15:18

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表