乱世老高 发表于 2024-3-22 21:52:30

多线程运行情况下的字典赋值——水资源优化配置


# -*- coding: utf-8 -*-
import numpy as np
import os
import pandas as pd
from deap import base, creator, tools, algorithms
import subprocess
import random
import openpyxl
from netCDF4 import Dataset
import multiprocessing
import configparser
import shutil


global result_list
result_list = {}
excel_template = r'E:\cwat_new\diversion_optimization\diversion_template.xlsx'
ini_file = r'E:\cwat_new\diversion_optimization\optim.ini'
total_water = 15128
used_ids = set()


def generate_unique_id():
    unique_id = random.randint(1, 100000000)
    while unique_id in used_ids:
      unique_id = random.randint(1, 100000000)
    used_ids.add(unique_id)
    return unique_id

def read_initial_values(excel_file_path, column_name):
    workbook = openpyxl.load_workbook(excel_file_path)
    sheet = workbook.active
    # 查找指定列的索引
    column_index = None
    for cell in sheet:
      if cell.value == column_name:
            column_index = cell.column_letter
            break
    if column_index is None:
      print(f"找不到名为 '{column_name}' 的列标题")
      return []
    # 获取指定列下的所有值
    column_values = []
    for cell in sheet:
      column_values.append(cell.value)
    return column_values

def evaluate_model_output(nc_file):
    with Dataset(nc_file, 'r') as nc:
      times = nc.variables['time'][:]
      data_var = nc.variables['unmetDemandM3_annualavg']
      # 遍历每个时间段,计算总和
      for i, _ in enumerate(times):
            data_at_time = data_var
            data_at_time = data_at_time * 365
            # 计算当前时间段的总和
            total = np.sum(data_at_time)
            #print(f"Time {i} total: {total}")
    total = tuple()
    return total

# 更新Excel文件中的月份分配数据
def update_excel_data(filename, allocations):
    df = pd.DataFrame(allocations, columns=['Allocation'])
    df['Month'] = range(1, 13)
    with pd.ExcelWriter(filename, engine='openpyxl', mode='w') as writer:
      df.to_excel(writer, index=False)

def mutate_individual(individual, mutation_probability):
    for i in range(len(individual)):
      if random.random() < mutation_probability:
            individual += random.gauss(0, 100)# 变异操作

            # 修正子代,确保在非负范围内
            individual = max(individual, 0)

    return

def cxBlendBounded(ind1, ind2, alpha=0.5, low=0):
    """执行cxBlend交叉操作,并确保子代的值位于范围内。"""
    for i in range(len(ind1)):
      gamma = (1. + 2. * alpha) * random.random() - alpha
      ind1 = (1. - gamma) * ind1 + gamma * ind2
      ind2 = gamma * ind1 + (1. - gamma) * ind2
      # 确保值不小于下限
      ind1 = max(low, ind1)
      ind2 = max(low, ind2)
    return ind1, ind2

# 运行水文模型的命令行函数
def run_hydrological_model(unique_id, monthly_allocation):
    folder_name = f'E:\\cwat_new\\diversion_optimization\\{unique_id}'
    os.makedirs(folder_name, exist_ok=True)
    output_folder = os.path.join(folder_name, 'output')
    os.makedirs(output_folder, exist_ok=True)   
    excel_file = os.path.join(folder_name, f'diversion_{unique_id}.xlsx')   
    # 使用模板文件创建新的Excel
    shutil.copy(excel_template, excel_file)   
    #创建新的ini文件
    config = configparser.ConfigParser()
    config.optionxform = lambda option: option
    config.read(ini_file)
    # 修改参数值
    config.set('WATERDIVERSION', 'monthly_water_quota', excel_file)
    config.set('FILE_PATHS', 'PathOut', output_folder)
    # 保存修改后的ini文件
    ini_fileC = os.path.join(folder_name, f'optim_{unique_id}.ini')
    with open(ini_fileC, 'w') as configfile:
      config.write(configfile)
    # 更新Excel文件以供模型使用
    update_excel_data(excel_file, monthly_allocation)
   
    # 运行水文模型,假设模型的可执行文件名为 "hydro_model",并且它使用excel文件
    subprocess.run(['python', r'E:\cwat_new\CWatM-main\run_cwatm.py', ini_fileC], check=True)
    nc_file = os.path.join(output_folder, 'unmetDemandM3_annualavg.nc')
    result_list)] = unique_id
    # 从生成的nc文件中读取评价指标
    return evaluate_model_output(nc_file), unique_id


# 评价函数,目标是最小化评价指标
def evaluate(individual):
   
    unique_id = generate_unique_id()
    # 确保分配水量不超过限制
    if not isinstance(individual, list):
      individual = list(individual)
    # 确保 individual 中的元素都是整数
    individual =
   
    # 计算分配方案的总和
    allocation_sum = sum(individual)
    penalty = abs(allocation_sum - total_water) * 1000000
   
    if allocation_sum > total_water:
      return 1e10,# 返回一个很大的值,表示不可行的解

    result, unique_id = run_hydrological_model(unique_id, individual)

    return result + penalty,

# 设置遗传算法
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("attr_float", random.uniform, 0, total_water/12)# 假设平均每月分配
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, n=12)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxBlend, alpha=0.5)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=100, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

# 遗传算法参数
population_size = 1
crossover_probability = 0.7
mutation_probability = 0.2
number_of_generations = 0
toolbox.register("mutate", mutate_individual, mutation_probability=mutation_probability)
toolbox.register("mate", cxBlendBounded, alpha=0.5, low=0)

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    pool_size = int(multiprocessing.cpu_count() * 0.8)
    pool = multiprocessing.Pool(processes=pool_size)
    toolbox.register("map", pool.map)
    population = toolbox.population(n=population_size)
    #设置初始值
    initial_values = read_initial_values(excel_template, 'Allocation')   
    population[:] = initial_values
   
    final_population, logbook = algorithms.eaSimple(population, toolbox, cxpb=crossover_probability, mutpb=mutation_probability, ngen=number_of_generations, verbose=True)
    # 找到最优解
    best_ind = tools.selBest(population, 1)
    best_fitness = best_ind.fitness.values
    best_unique_id = result_list
    print("Best Individual is: ", best_ind)
    print("Best Individual fitness:", best_fitness)
    print("Best Individual ID is:", best_unique_id)

如代码所示,在subprocess.run(['python', r'E:\cwat_new\CWatM-main\run_cwatm.py', ini_fileC], check=True)后的result_list)] = unique_id每次无法正常赋值,在spyder中一直显示空字典。

FishC 发表于 2024-3-22 21:52:59

您好!在多线程运行的情况下,可能会出现同步问题,导致result_list为空字典。我推测问题可能出在使用`multiprocessing.Manager()`创建了一个共享字典`result_list`后,没有正确地传递给子进程,所以子进程无法访问和修改这个共享字典。

为了解决这个问题,您可以将`result_list`作为参数传递给子进程,而不是使用`register("map", pool.map)`来设置子进程的映射函数。具体来说,您可以定义一个新的函数来替代`run_hydrological_model`,并将`result_list`作为参数传递给该函数。子进程将根据需要从这个共享字典中读取结果并进行更新。代码示例如下:

def run_hydrological_model_wrapper(args):
    unique_id, monthly_allocation, result_dict = args
    # ...
    # 在适当的时候更新 result_dict
    # ...
    return evaluate_model_output(nc_file), unique_id

# ...

if __name__ == "__main__":
    # ...
    # 创建 multiprocessing.Pool 对象
   
    # 将 result_list 传递给子进程
    task_args = [(unique_id, individual, result_list) for unique_id, individual in zip(unique_ids, population)]
   
    results = pool.map(run_hydrological_model_wrapper, task_args)
   
    # 处理结果
    for result, unique_id in results:
      result_list = result

这里,我们使用`run_hydrological_model_wrapper`来封装原来的`run_hydrological_model`函数,并在其中将`result_list`作为参数传递给新函数。在定义`task_args`时,我们将`result_list`与其他参数一起打包成了一个元组。然后,我们使用多线程池的`map`方法来运行`run_hydrological_model_wrapper`,并得到计算结果。最后,我们将结果添加到`result_list`中。

请注意,在这个新的方案中,我们需要根据需要修改`run_hydrological_model_wrapper`函数的内部代码以正确处理`result_list`。此外,我们还需要重新调整和更新其他部分的代码以适应这个新的方案。

希望能对您有所帮助!如果您还有任何问题,请随时提问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

乱世老高 发表于 2024-3-22 21:53:27

程序使用GPT写的,我在里面做了点适配,但是这个赋值问题一直解决不了,问了GPT好多遍了{:10_266:}{:10_266:}

FishC 发表于 2024-3-22 21:53:45

乱世老高 发表于 2024-03-22 21:53
程序使用GPT写的,我在里面做了点适配,但是这个赋值 ...

非常抱歉,我给出的答案没有解决您的问题。在这种情况下,您可以尝试使用更简单的方式来解决赋值问题。以下是我对您的代码进行了一些修改,以尝试解决赋值问题:

from multiprocessing import Pool

# ...

def run_hydrological_model(args):
    unique_id, monthly_allocation = args
    # ...
    nc_file = os.path.join(output_folder, 'unmetDemandM3_annualavg.nc')
    result = evaluate_model_output(nc_file)
    return result, unique_id

# ...

if __name__ == "__main__":
    pool = Pool(processes=4)# 修改为适当的进程数
    task_args = [(generate_unique_id(), individual) for individual in population]
    results = pool.map(run_hydrological_model, task_args)

    for result, unique_id in results:
      result_list = unique_id

在这个新的方案中,我使用了`multiprocessing.Pool`来创建进程池,并将`run_hydrological_model`作为任务函数传递给`pool.map()`方法。同时,我将每个任务的参数包装为一个元组列表`task_args`,其中每个元组包含了唯一ID和单个个体的信息。

再次强调,这只是一种尝试解决问题的方式,如果还是没有解决赋值问题,请提供更多关于具体问题的信息,以便我能够更好地帮助您。祝您好运!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

乱世老高 发表于 2024-3-22 21:56:24

FishC 发表于 2024-3-22 21:52
您好!在多线程运行的情况下,可能会出现同步问题,导致result_list为空字典。我推测问题可能出在使用`mult ...

我需要你告诉我明确的改法,以及添加的位置

FishC 发表于 2024-3-22 21:57:51

乱世老高 发表于 2024-03-22 21:56
我需要你告诉我明确的改法,以及添加的位置

非常抱歉,我没有提供明确的修改代码的位置和方式。在您的代码中,我注意到`run_hydrological_model`函数的返回值是一个元组`(evaluate_model_output(nc_file), unique_id)`,这个返回值应该存储在`result_list`中。为了解决问题,您可以将`result_list`作为参数传递给`run_hydrological_model`函数,并在其中更新`result_list`。以下是代码的修改部分:

def run_hydrological_model(unique_id, monthly_allocation, result_dict):
    # ...
    # 在适当的时候更新 result_dict
    # ...
    result_dict = evaluate_model_output(nc_file)
    return evaluate_model_output(nc_file), unique_id

# ...

if __name__ == "__main__":
    # ...
    # 创建 multiprocessing.Pool 对象
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    task_args = [(generate_unique_id(), individual, result_list) for individual in population]
    results = pool.starmap(run_hydrological_model, task_args)

    # 处理结果
    for result, unique_id in results:
      result_list = result

在这里,我们使用`manager.dict()`创建了一个共享字典`result_list`,并将其作为参数传递给`run_hydrological_model`函数。在`run_hydrological_model`函数中,根据需要更新`result_list`。在`task_args`中,我们为每个个体和唯一ID创建了一个元组,并将`result_list`添加为最后一个参数。使用`pool.starmap`来运行`run_hydrological_model`函数,并将结果存储在`results`中。最后,我们将`results`中的结果添加到`result_list`中。

请注意,这只是修改代码的示例,您可能需要根据自己的实际情况进行调整和完善。希望这次能够准确地帮助到您!如果您还有疑问,请随时提问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

乱世老高 发表于 2024-3-22 22:04:55

FishC 发表于 2024-3-22 21:57
非常抱歉,我没有提供明确的修改代码的位置和方式。在您的代码中,我注意到`run_hydrological_model`函 ...

if __name__ == "__main__":
    # ...
    # 创建 multiprocessing.Pool 对象
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    task_args = [(generate_unique_id(), individual, result_list) for individual in population]
    results = pool.starmap(run_hydrological_model, task_args)

    # 处理结果
    for result, unique_id in results:
      result_list = result
这段代码应该放在我源代码的哪个位置,我需要具体的位置

FishC 发表于 2024-3-22 22:05:09

乱世老高 发表于 2024-03-22 22:04
if __name__ == "__main__":
    # ...
    # 创建...

非常抱歉之前的回答没有提供具体的位置。在您的源代码中,`if __name__ == "__main__":`是主程序的入口点,即程序开始执行的地方。因此,您需要将这段代码放在`if __name__ == "__main__":`的主体部分中。

以下是修改后的代码示例:

if __name__ == "__main__":
    # ...

    # 创建 multiprocessing.Pool 对象
    manager = multiprocessing.Manager()
    result_list = manager.dict()
   
    # 确定 task_args 的生成方式,根据实际情况进行调整
   
    with multiprocessing.Pool(processes=pool_size) as pool:
      results = pool.starmap(run_hydrological_model, task_args)

    # 处理结果
    for result, unique_id in results:
      result_list = result

上述代码片段表示在主程序的入口点处创建了一个`multiprocessing.Pool`对象,并使用`manager.dict()`创建了一个共享字典`result_list`。然后,通过`pool.starmap`方法并应用到`run_hydrological_model`函数和`task_args`列表上,启动并行计算任务。最后,将结果以`unique_id`为键,存储在`result_list`中。

请注意,在这个示例中,我假设`task_args`是一个列表,其中包含传递给`run_hydrological_model`函数的参数。您需要根据实际情况调整生成`task_args`的方式。

希望这次能够明确地回答您的问题!如果您还有任何疑问,请随时提问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

乱世老高 发表于 2024-3-22 22:07:16

FishC 发表于 2024-3-22 22:05
非常抱歉之前的回答没有提供具体的位置。在您的源代码中,`if __name__ == "__main__":`是主程序的入口 ...

你没有理解我的意思,我是说
if __name__ == "__main__":
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    pool_size = int(multiprocessing.cpu_count() * 0.8)
    pool = multiprocessing.Pool(processes=pool_size)
    toolbox.register("map", pool.map)
    population = toolbox.population(n=population_size)
    #设置初始值
    initial_values = read_initial_values(excel_template, 'Allocation')   
    population[:] = initial_values
    final_population, logbook = algorithms.eaSimple(population, toolbox, cxpb=crossover_probability, mutpb=mutation_probability, ngen=number_of_generations, verbose=True)
    # 找到最优解
    best_ind = tools.selBest(population, 1)
    best_fitness = best_ind.fitness.values
    best_unique_id = result_list
    print("Best Individual is: ", best_ind)
    print("Best Individual fitness:", best_fitness)
    print("Best Individual ID is:", best_unique_id)
请告诉我在加入了这些代码后,这块代码应该长什么样子

FishC 发表于 2024-3-22 22:07:41

乱世老高 发表于 2024-03-22 22:07
你没有理解我的意思,我是说
if __name__ == "__main ...

非常抱歉之前的回答中对您的问题理解有误。根据您提供的代码块,下面是在加入了这些代码后,`if __name__ == "__main__":`部分的修改示例:

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    pool_size = int(multiprocessing.cpu_count() * 0.8)
    pool = multiprocessing.Pool(processes=pool_size)
    toolbox.register("map", pool.map)

    population = toolbox.population(n=population_size)
    # 设置初始值
    initial_values = read_initial_values(excel_template, 'Allocation')
    population = initial_values

    final_population, logbook = algorithms.eaSimple(population, toolbox, cxpb=crossover_probability, mutpb=mutation_probability, ngen=number_of_generations, verbose=True)
   
    # 找到最优解
    best_ind = tools.selBest(population, 1)
    best_fitness = best_ind.fitness.values
    best_unique_id = list(result_list.keys()) if len(result_list) > 0 else None
   
    print("Best Individual is:", best_ind)
    print("Best Individual fitness:", best_fitness)
    print("Best Individual ID is:", best_unique_id)

上述代码模块是在主程序的入口点处添加的。它首先创建了一个`multiprocessing.Manager()`对象,然后使用`manager.dict()`创建了共享字典`result_list`来存储计算结果。

接着,通过`multiprocessing.cpu_count()`获取CPU核心数并乘以0.8得到线程池的大小`pool_size`。然后,使用`multiprocessing.Pool(processes=pool_size)`创建了一个线程池对象`pool`。通过`toolbox.register("map", pool.map)`将多线程映射方法注册到`toolbox`中。

接下来,根据您的需求初始化种群。然后,调用`algorithms.eaSimple()`方法进行遗传算法的进化计算。最后,使用`tools.selBest()`选择最优个体,并提取其适应度值和唯一ID。

请注意,代码示例中对`best_unique_id`的处理方式是将`result_list`中的第一个键作为最优个体的唯一ID(如果`result_list`非空)。如果您的要求有所不同,请适当修改这部分代码。

希望这次的回答能够满足您的需求!如果您还有任何问题,请随时提问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

陈勃 发表于 2024-3-23 11:30:40

{:10_307:}

陈勃 发表于 2024-3-23 11:32:32

{:10_328:}

疾风之杰西 发表于 2024-3-27 09:19:56

{:5_111:}

尹玄睿 发表于 2024-3-28 09:24:05

1

尹玄睿 发表于 2024-3-28 09:24:40

2

gametsbug 发表于 2024-3-30 18:36:25

学习

a905448839 发表于 2024-3-31 15:50:52

牛逼

芜湖666 发表于 2024-4-2 07:35:53

我不懂

芜湖666 发表于 2024-4-2 07:36:24

666

芜湖666 发表于 2024-4-2 07:39:05

123123
页: [1] 2
查看完整版本: 多线程运行情况下的字典赋值——水资源优化配置