乱世老高 发表于 2024-3-22 22:15:06

水资源优化

# -*- coding: utf-8 -*-
import numpy as np
import os
import pandas as pd
from deap import base, creator, tools, algorithms
import subprocess
import random
import openpyxl
from netCDF4 import Dataset
import multiprocessing
import configparser
import shutil


global result_list
result_list = {}
excel_template = r'E:\cwat_new\diversion_optimization\diversion_template.xlsx'
ini_file = r'E:\cwat_new\diversion_optimization\optim.ini'
total_water = 15128
used_ids = set()


def generate_unique_id():
    unique_id = random.randint(1, 100000000)
    while unique_id in used_ids:
      unique_id = random.randint(1, 100000000)
    used_ids.add(unique_id)
    return unique_id

def read_initial_values(excel_file_path, column_name):
    workbook = openpyxl.load_workbook(excel_file_path)
    sheet = workbook.active
    # 查找指定列的索引
    column_index = None
    for cell in sheet:
      if cell.value == column_name:
            column_index = cell.column_letter
            break
    if column_index is None:
      print(f"找不到名为 '{column_name}' 的列标题")
      return []
    # 获取指定列下的所有值
    column_values = []
    for cell in sheet:
      column_values.append(cell.value)
    return column_values

def evaluate_model_output(nc_file):
    with Dataset(nc_file, 'r') as nc:
      times = nc.variables['time'][:]
      data_var = nc.variables['unmetDemandM3_annualavg']
      # 遍历每个时间段,计算总和
      for i, _ in enumerate(times):
            data_at_time = data_var
            data_at_time = data_at_time * 365
            # 计算当前时间段的总和
            total = np.sum(data_at_time)
            #print(f"Time {i} total: {total}")
    total = tuple()
    return total

# 更新Excel文件中的月份分配数据
def update_excel_data(filename, allocations):
    df = pd.DataFrame(allocations, columns=['Allocation'])
    df['Month'] = range(1, 13)
    with pd.ExcelWriter(filename, engine='openpyxl', mode='w') as writer:
      df.to_excel(writer, index=False)

def mutate_individual(individual, mutation_probability):
    for i in range(len(individual)):
      if random.random() < mutation_probability:
            individual += random.gauss(0, 100)# 变异操作

            # 修正子代,确保在非负范围内
            individual = max(individual, 0)

    return

def cxBlendBounded(ind1, ind2, alpha=0.5, low=0):
    """执行cxBlend交叉操作,并确保子代的值位于范围内。"""
    for i in range(len(ind1)):
      gamma = (1. + 2. * alpha) * random.random() - alpha
      ind1 = (1. - gamma) * ind1 + gamma * ind2
      ind2 = gamma * ind1 + (1. - gamma) * ind2
      # 确保值不小于下限
      ind1 = max(low, ind1)
      ind2 = max(low, ind2)
    return ind1, ind2

# 运行水文模型的命令行函数
def run_hydrological_model(unique_id, monthly_allocation):
    folder_name = f'E:\\cwat_new\\diversion_optimization\\{unique_id}'
    os.makedirs(folder_name, exist_ok=True)
    output_folder = os.path.join(folder_name, 'output')
    os.makedirs(output_folder, exist_ok=True)   
    excel_file = os.path.join(folder_name, f'diversion_{unique_id}.xlsx')   
    # 使用模板文件创建新的Excel
    shutil.copy(excel_template, excel_file)   
    #创建新的ini文件
    config = configparser.ConfigParser()
    config.optionxform = lambda option: option
    config.read(ini_file)
    # 修改参数值
    config.set('WATERDIVERSION', 'monthly_water_quota', excel_file)
    config.set('FILE_PATHS', 'PathOut', output_folder)
    # 保存修改后的ini文件
    ini_fileC = os.path.join(folder_name, f'optim_{unique_id}.ini')
    with open(ini_fileC, 'w') as configfile:
      config.write(configfile)
    # 更新Excel文件以供模型使用
    update_excel_data(excel_file, monthly_allocation)
   
    # 运行水文模型,假设模型的可执行文件名为 "hydro_model",并且它使用excel文件
    subprocess.run(['python', r'E:\cwat_new\CWatM-main\run_cwatm.py', ini_fileC], check=True)
    nc_file = os.path.join(output_folder, 'unmetDemandM3_annualavg.nc')
    result_list)] = unique_id
    # 从生成的nc文件中读取评价指标
    return evaluate_model_output(nc_file), unique_id


# 评价函数,目标是最小化评价指标
def evaluate(individual):
   
    unique_id = generate_unique_id()
    # 确保分配水量不超过限制
    if not isinstance(individual, list):
      individual = list(individual)
    # 确保 individual 中的元素都是整数
    individual =
   
    # 计算分配方案的总和
    allocation_sum = sum(individual)
    penalty = abs(allocation_sum - total_water) * 1000000
   
    if allocation_sum > total_water:
      return 1e10,# 返回一个很大的值,表示不可行的解

    result, unique_id = run_hydrological_model(unique_id, individual)

    return result + penalty,

# 设置遗传算法
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("attr_float", random.uniform, 0, total_water/12)# 假设平均每月分配
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, n=12)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxBlend, alpha=0.5)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=100, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

# 遗传算法参数
population_size = 1
crossover_probability = 0.7
mutation_probability = 0.2
number_of_generations = 0
toolbox.register("mutate", mutate_individual, mutation_probability=mutation_probability)
toolbox.register("mate", cxBlendBounded, alpha=0.5, low=0)

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    result_list = manager.dict()
    pool_size = int(multiprocessing.cpu_count() * 0.8)
    pool = multiprocessing.Pool(processes=pool_size)
    toolbox.register("map", pool.map)
    population = toolbox.population(n=population_size)
    #设置初始值
    initial_values = read_initial_values(excel_template, 'Allocation')   
    population[:] = initial_values
   
    final_population, logbook = algorithms.eaSimple(population, toolbox, cxpb=crossover_probability, mutpb=mutation_probability, ngen=number_of_generations, verbose=True)
    # 找到最优解
    best_ind = tools.selBest(population, 1)
    best_fitness = best_ind.fitness.values
    best_unique_id = result_list
    print("Best Individual is: ", best_ind)
    print("Best Individual fitness:", best_fitness)
    print("Best Individual ID is:", best_unique_id)
对该程序的多线程部分的result_list)] = unique_id这行代码进行优化,在每次运行过程中,该行代码都无法正常赋值,可能与多线程处理有关。
请告诉我问题出在哪里,并告诉我如何改进,改进的方法由重新生成整个修改过后的代码呈现

FishC 发表于 2024-3-22 22:15:24

问题出在于在多线程环境中对共享变量result_list的操作不是线程安全的,导致多个线程同时修改result_list,可能发生竞争条件,结果无法正常赋值。

为了解决这个问题,你可以使用线程锁来确保对result_list的操作是原子的。修改代码如下:
import threading

# 创建一个线程锁
result_lock = threading.Lock()

# 在run_hydrological_model函数中获取锁并更新result_list
def run_hydrological_model(unique_id, monthly_allocation):
    # ...
    with result_lock:
      result_list = unique_id
    # ...

# 在evaluate函数中也需要获取锁以确保对result_list的访问是线程安全的
def evaluate(individual):
    # ...
    with result_lock:
      result, unique_id = run_hydrological_model(unique_id, individual)
    # ...

# 在主程序中创建线程池时,通过参数传递锁
pool = multiprocessing.Pool(processes=pool_size, initializer=lambda: setattr(run_hydrological_model, 'lock', result_lock))

这样,每次对result_list进行操作时都会先获取锁,确保只有一个线程能够修改result_list,从而避免竞争条件。

另外,你还需要注意的是在多线程环境下对共享变量的读写要谨慎,确保数据的一致性和正确性。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。

乱世老高 发表于 2024-3-22 22:21:44

FishC 发表于 2024-3-22 22:15
问题出在于在多线程环境中对共享变量result_list的操作不是线程安全的,导致多个线程同时修改result_list, ...

runfile('E:/cwat_new/diversion_optimization/test.py', wdir='E:/cwat_new/diversion_optimization')
Traceback (most recent call last):

File ~\anaconda3\Lib\site-packages\spyder_kernels\py3compat.py:356 in compat_exec
    exec(code, globals, locals)

File e:\cwat_new\diversion_optimization\test.py:172
    pool = multiprocessing.Pool(processes=pool_size, initializer=lambda: setattr(run_hydrological_model, 'lock', result_lock))

File ~\anaconda3\Lib\multiprocessing\context.py:119 in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,

File ~\anaconda3\Lib\multiprocessing\pool.py:215 in __init__
    self._repopulate_pool()

File ~\anaconda3\Lib\multiprocessing\pool.py:306 in _repopulate_pool
    return self._repopulate_pool_static(self._ctx, self.Process,

File ~\anaconda3\Lib\multiprocessing\pool.py:329 in _repopulate_pool_static
    w.start()

File ~\anaconda3\Lib\multiprocessing\process.py:121 in start
    self._popen = self._Popen(self)

File ~\anaconda3\Lib\multiprocessing\context.py:336 in _Popen
    return Popen(process_obj)

File ~\anaconda3\Lib\multiprocessing\popen_spawn_win32.py:94 in __init__
    reduction.dump(process_obj, to_child)

File ~\anaconda3\Lib\multiprocessing\reduction.py:60 in dump
    ForkingPickler(file, protocol).dump(obj)

PicklingError: Can't pickle <function <lambda> at 0x0000026D2654C680>: attribute lookup <lambda> on __main__ failed
报错如上

FishC 发表于 2024-3-22 22:22:03

乱世老高 发表于 2024-03-22 22:21
runfile('E:/cwat_new/diversion_optimization/test.p ...

这个错误是因为在使用lambda函数初始化线程池时,无法正确序列化线程锁 `result_lock`。在 multiprocessing 模块中,需要将所有在主进程和子进程之间共享的对象进行序列化和反序列化操作,但是线程锁不支持序列化。解决这个问题的方法是使用 `multiprocessing.Manager()` 来创建一个可在多进程间共享的线程锁。

下面是修改后的代码示例:

import multiprocessing

# 创建一个线程锁
manager = multiprocessing.Manager()
result_lock = manager.Lock()

# 在run_hydrological_model函数中获取锁并更新result_list
def run_hydrological_model(unique_id, monthly_allocation):
    # ...
    with result_lock:
      result_list = unique_id
    # ...

# 在 evaluate 函数中也需要获取锁以确保对 result_list 的访问是线程安全的
def evaluate(individual):
    # ...
    with result_lock:
      result, unique_id = run_hydrological_model(unique_id, individual)
    # ...

# 在主程序中创建线程池时,通过参数传递锁
pool = multiprocessing.Pool(processes=pool_size, initializer=lambda: setattr(run_hydrological_model, 'lock', result_lock))

通过使用 `multiprocessing.Manager()` 创建的线程锁 `result_lock` 可以在多进程间共享,并且可以正确地序列化和反序列化,从而避免了报错。

希望这次修改能够解决你的问题。如果还有其他疑问,请随时提问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
页: [1]
查看完整版本: 水资源优化