鱼C论坛

 找回密码
 立即注册
查看: 34|回复: 3

[其他工具] 编写了一个调用大模型校对的软件

[复制链接]
发表于 3 小时前 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
如题,写了一个利用大模型校对的Py程序,代码如下:
  1. """
  2. 文本自动校对
  3. 版本: 1.1
  4. 作者:shadowmage
  5. 创建日期: 2025年11月

  6. 配置文件示例 (config.ini):
  7. ==============
  8. [API]
  9. base_url = https://api.siliconflow.cn
  10. api_key = your_api_key_here
  11. model_id = deepseek-ai/DeepSeek-R1-0528-Qwen3-8B
  12. chat_endpoint = /v1/chat/completions

  13. [Processing]
  14. max_chunk_size = 1500
  15. max_retries = 3
  16. backoff_factor = 1
  17. timeout = 600
  18. long_text_threshold = 1000

  19. [Paths]

  20. [Spacy]
  21. language_model = zh_core_web_sm
  22. fallback_model = en_core_web_sm


  23. 许可证: MIT License
  24. """

  25. import os
  26. import json
  27. import requests
  28. import difflib
  29. import re
  30. import spacy
  31. import configparser
  32. from pathlib import Path
  33. import time
  34. import random
  35. import datetime
  36. from concurrent.futures import ThreadPoolExecutor, as_completed
  37. import threading
  38. from requests.adapters import HTTPAdapter
  39. from urllib3.util.retry import Retry
  40. import csv
  41. from collections import defaultdict

  42. class OptimizedConfigManager:
  43.     def __init__(self, config_path='config.ini'):
  44.         self.config_path = Path(config_path)
  45.         self.config = configparser.ConfigParser()
  46.         self.load_config()
  47.    
  48.     def load_config(self):
  49.         if not self.config_path.exists():
  50.             raise FileNotFoundError(f"配置文件 {self.config_path} 不存在")
  51.         
  52.         self.config.read(self.config_path, encoding='utf-8')
  53.         
  54.         required_sections = ['API', 'Processing']
  55.         for section in required_sections:
  56.             if not self.config.has_section(section):
  57.                 raise ValueError(f"配置文件中缺少必要的 [{section}] 部分")
  58.    
  59.     def get_api_config(self):
  60.         return {
  61.             'base_url': self.config.get('API', 'base_url', fallback='https://api.siliconflow.cn'),
  62.             'api_key': self.config.get('API', 'api_key'),
  63.             'model_id': self.config.get('API', 'model_id', fallback='deepseek-ai/DeepSeek-R1-0528-Qwen3-8B'),
  64.             'chat_endpoint': self.config.get('API', 'chat_endpoint', fallback='/v1/chat/completions'),
  65.             'batch_size': self.config.getint('API', 'batch_size', fallback=3),
  66.             'max_workers': self.config.getint('API', 'max_workers', fallback=5)
  67.         }
  68.    
  69.     def get_processing_config(self):
  70.         return {
  71.             'max_chunk_size': self.config.getint('Processing', 'max_chunk_size', fallback=1500),
  72.             'max_retries': self.config.getint('Processing', 'max_retries', fallback=3),
  73.             'backoff_factor': self.config.getfloat('Processing', 'backoff_factor', fallback=1),
  74.             'timeout': self.config.getint('Processing', 'timeout', fallback=600),
  75.             'long_text_threshold': self.config.getint('Processing', 'long_text_threshold', fallback=1000),
  76.             'enable_cache': self.config.getboolean('Processing', 'enable_cache', fallback=True),
  77.             'noun_dict_path': self.config.get('Processing', 'noun_dict_path', fallback='noun_correction_dict.csv')
  78.         }
  79.    
  80.     def get_paths_config(self):
  81.         return {
  82.             'source_suffix': self.config.get('Paths', 'source_suffix', fallback='_校对结果'),
  83.             'report_prefix': self.config.get('Paths', 'report_prefix', fallback='校对报告_')
  84.         }
  85.    
  86.     def get_spacy_config(self):
  87.         return {
  88.             'language_model': self.config.get('Spacy', 'language_model', fallback='zh_core_web_sm'),
  89.             'fallback_model': self.config.get('Spacy', 'fallback_model', fallback='en_core_web_sm')
  90.         }

  91. class NounCorrectionManager:
  92.     def __init__(self, dict_path='noun_correction_dict.csv'):
  93.         self.dict_path = Path(dict_path)
  94.         self.correction_dict = {}
  95.         self.load_dictionary()
  96.    
  97.     def load_dictionary(self):
  98.         if self.dict_path.exists():
  99.             try:
  100.                 with open(self.dict_path, 'r', encoding='utf-8', newline='') as f:
  101.                     reader = csv.reader(f)
  102.                     for row in reader:
  103.                         if len(row) >= 2:
  104.                             original, corrected = row[0], row[1]
  105.                             self.correction_dict[original] = corrected
  106.                 print(f"已加载名词修正字典: {len(self.correction_dict)} 条记录")
  107.             except Exception as e:
  108.                 print(f"加载名词修正字典失败: {e}")
  109.    
  110.     def save_dictionary(self):
  111.         try:
  112.             with open(self.dict_path, 'w', encoding='utf-8', newline='') as f:
  113.                 writer = csv.writer(f)
  114.                 for original, corrected in self.correction_dict.items():
  115.                     writer.writerow([original, corrected])
  116.             print(f"名词修正字典已保存: {len(self.correction_dict)} 条记录")
  117.         except Exception as e:
  118.             print(f"保存名词修正字典失败: {e}")
  119.    
  120.     def add_correction(self, original, corrected):
  121.         if original and corrected and original != corrected:
  122.             self.correction_dict[original] = corrected
  123.    
  124.     def apply_corrections(self, text):
  125.         if not self.correction_dict:
  126.             return text
  127.         
  128.         for original, corrected in self.correction_dict.items():
  129.             text = re.sub(r'\b' + re.escape(original) + r'\b', corrected, text)
  130.         return text

  131. class OptimizedTextProofreader:
  132.     def __init__(self, config_path='config.ini'):
  133.         self._log_lock = threading.Lock()
  134.         self.config_manager = OptimizedConfigManager(config_path)
  135.         self.api_config = self.config_manager.get_api_config()
  136.         self.processing_config = self.config_manager.get_processing_config()
  137.         self.paths_config = self.config_manager.get_paths_config()
  138.         self.spacy_config = self.config_manager.get_spacy_config()
  139.         
  140.         self.noun_manager = NounCorrectionManager(self.processing_config['noun_dict_path'])
  141.         self.chat_endpoint = f"{self.api_config['base_url']}{self.api_config['chat_endpoint']}"
  142.         self.text_cache = {} if self.processing_config['enable_cache'] else None
  143.         self.session = self._create_session()
  144.         self.nlp = self._initialize_spacy()
  145.         self.noun_changes = defaultdict(list)
  146.         
  147.         self._log("优化版文本校对器初始化完成")
  148.         self._log(f"API模型: {self.api_config['model_id']}")
  149.         self._log(f"最大工作线程: {self.api_config['max_workers']}")
  150.         self._log(f"批量大小: {self.api_config['batch_size']}")
  151.         self._log(f"缓存启用: {self.processing_config['enable_cache']}")
  152.         self._log(f"名词修正字典: {self.processing_config['noun_dict_path']}")
  153.    
  154.     def _create_session(self):
  155.         session = requests.Session()
  156.         retry_strategy = Retry(
  157.             total=self.processing_config['max_retries'],
  158.             backoff_factor=self.processing_config['backoff_factor'],
  159.             status_forcelist=[429, 500, 502, 503, 504],
  160.         )
  161.         adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=100)
  162.         session.mount("http://", adapter)
  163.         session.mount("https://", adapter)
  164.         
  165.         return session
  166.    
  167.     def _get_timestamp(self):
  168.         return datetime.datetime.now().strftime("%Y/%m/%d-%H:%M:%S")
  169.    
  170.     def _log(self, message):
  171.         timestamp = self._get_timestamp()
  172.         with self._log_lock:
  173.             print(f"{timestamp}  {message}")
  174.    
  175.     def _initialize_spacy(self):
  176.         try:
  177.             nlp = spacy.load(self.spacy_config['language_model'])
  178.             self._log(f"spaCy模型加载成功: {self.spacy_config['language_model']}")
  179.             return nlp
  180.         except OSError:
  181.             self._log(f"警告: 未找到spaCy模型 {self.spacy_config['language_model']},尝试备用模型...")
  182.             try:
  183.                 nlp = spacy.load(self.spacy_config['fallback_model'])
  184.                 self._log(f"使用spaCy备用模型: {self.spacy_config['fallback_model']}")
  185.                 return nlp
  186.             except OSError:
  187.                 self._log("错误: 未找到任何spaCy模型,将使用基于标点的分句方案")
  188.                 return None
  189.    
  190.     def _get_cache_key(self, text):
  191.         return hash(text[:100] + text[-100:])  # 使用首尾各100字符的哈希作为键
  192.    
  193.     def sentence_segmentation(self, text):
  194.         sentences = []
  195.         
  196.         if self.nlp:
  197.             doc = self.nlp(text)
  198.             for sent in doc.sents:
  199.                 sentence_text = sent.text.strip()
  200.                 if sentence_text:
  201.                     sentences.append(sentence_text)
  202.         else:
  203.             sentence_endings = r'([。!?!?]+)'
  204.             parts = re.split(sentence_endings, text)
  205.             
  206.             current_sentence = ""
  207.             for i, part in enumerate(parts):
  208.                 if i % 2 == 0:  # 句子内容
  209.                     current_sentence = part.strip()
  210.                 else:  # 句子结束标点
  211.                     if current_sentence:
  212.                         sentences.append(current_sentence + part)
  213.                         current_sentence = ""
  214.             
  215.             if current_sentence:
  216.                 sentences.append(current_sentence)
  217.         
  218.         return sentences
  219.    
  220.     def remove_duplicate_paragraphs(self, text):
  221.         if not text.strip():
  222.             return text
  223.         
  224.         paragraphs = re.split(r'\n\s*\n', text)
  225.         unique_paragraphs = []
  226.         seen_paragraphs = set()
  227.         
  228.         for para in paragraphs:
  229.             clean_para = re.sub(r'\s+', ' ', para.strip())
  230.             if clean_para and clean_para not in seen_paragraphs:
  231.                 seen_paragraphs.add(clean_para)
  232.                 unique_paragraphs.append(para)
  233.         
  234.         if len(unique_paragraphs) < len(paragraphs):
  235.             self._log(f"删除了 {len(paragraphs) - len(unique_paragraphs)} 个重复段落")
  236.         
  237.         return '\n\n'.join(unique_paragraphs)
  238.    
  239.     def batch_call_api(self, text_sentences):
  240.         if not text_sentences:
  241.             return []
  242.         
  243.         if self.text_cache is not None:
  244.             cached_results = []
  245.             uncached_sentences = []
  246.             uncached_indices = []
  247.             
  248.             for i, sentence in enumerate(text_sentences):
  249.                 cache_key = self._get_cache_key(sentence)
  250.                 if cache_key in self.text_cache:
  251.                     cached_results.append((i, self.text_cache[cache_key]))
  252.                 else:
  253.                     uncached_sentences.append(sentence)
  254.                     uncached_indices.append(i)
  255.             
  256.             if cached_results:
  257.                 self._log(f"缓存命中: {len(cached_results)}/{len(text_sentences)} 个句子")
  258.         else:
  259.             uncached_sentences = text_sentences
  260.             uncached_indices = list(range(len(text_sentences)))
  261.             cached_results = []
  262.         
  263.         if uncached_sentences:
  264.             results = self._parallel_process_sentences(uncached_sentences, uncached_indices)
  265.             all_results = cached_results + results
  266.             all_results.sort(key=lambda x: x[0])
  267.             return [result[1] for result in all_results]
  268.         else:
  269.             cached_results.sort(key=lambda x: x[0])
  270.             return [result[1] for result in cached_results]
  271.    
  272.     def _parallel_process_sentences(self, sentences, indices):
  273.         results = []
  274.         
  275.         batch_size = self.api_config['batch_size']
  276.         max_workers = min(self.api_config['max_workers'], len(sentences))
  277.         
  278.         with ThreadPoolExecutor(max_workers=max_workers) as executor:
  279.             future_to_index = {
  280.                 executor.submit(self._safe_call_api, sentence): idx
  281.                 for sentence, idx in zip(sentences, indices)
  282.             }
  283.             
  284.             completed = 0
  285.             for future in as_completed(future_to_index):
  286.                 idx = future_to_index[future]
  287.                 try:
  288.                     result = future.result()
  289.                     results.append((idx, result))
  290.                     completed += 1
  291.                     
  292.                     if self.text_cache is not None and result is not None:
  293.                         cache_key = self._get_cache_key(sentences[indices.index(idx)])
  294.                         self.text_cache[cache_key] = result
  295.                     
  296.                     if completed % 10 == 0:  # 每10个句子报告一次进度
  297.                         self._log(f"进度: {completed}/{len(sentences)} 个句子处理完成")
  298.                 except Exception as e:
  299.                     self._log(f"句子处理失败: {str(e)}")
  300.                     results.append((idx, None))  # 失败时返回None
  301.         
  302.         return results
  303.    
  304.     def _safe_call_api(self, text):
  305.         try:
  306.             return self.call_siliconflow_api(text)
  307.         except Exception as e:
  308.             self._log(f"API调用异常: {str(e)}")
  309.             return None
  310.    
  311.     def _clean_markdown_annotations(self, text):
  312.         text = re.sub(r'\{[^}]*?(?:句式修正|补全|修正|优化)[^}]*?\}', '', text)
  313.         text = re.sub(r'\*\*.*?\*\*', '', text)  # 加粗
  314.         text = re.sub(r'\*.*?\*', '', text)      # 斜体
  315.         text = re.sub(r'`.*?`', '', text)        # 代码标记
  316.         text = re.sub(r'#+\s*', '', text)        # 标题标记
  317.         
  318.         text = re.sub(r'\s+', ' ', text).strip()
  319.         
  320.         return text
  321.    
  322.     def _extract_noun_changes(self, original, corrected):
  323.         if not original or not corrected or original == corrected:
  324.             return
  325.         
  326.         if self.nlp:
  327.             try:
  328.                 doc_orig = self.nlp(original)
  329.                 doc_corr = self.nlp(corrected)
  330.                
  331.                 orig_nouns = [token.text for token in doc_orig if token.pos_ in ['NOUN', 'PROPN']]
  332.                 corr_nouns = [token.text for token in doc_corr if token.pos_ in ['NOUN', 'PROPN']]
  333.                
  334.                 for orig_noun in orig_nouns:
  335.                     if orig_noun not in corr_nouns:
  336.                         # 在修正文本中查找可能对应的名词
  337.                         for corr_noun in corr_nouns:
  338.                             if corr_noun not in orig_nouns and len(corr_noun) > 1:
  339.                                 self.noun_changes[orig_noun].append(corr_noun)
  340.                                 self.noun_manager.add_correction(orig_noun, corr_noun)
  341.                                 self._log(f"名词修改记录: '{orig_noun}' -> '{corr_noun}'")
  342.             except Exception as e:
  343.                 self._log(f"名词分析失败: {e}")
  344.    
  345.     def call_siliconflow_api(self, text):
  346.         headers = {
  347.             "Authorization": f"Bearer {self.api_config['api_key']}",
  348.             "Content-Type": "application/json"
  349.         }

  350.         system_prompt = """你是一位专业的小说文本校对专家。请严格按照以下要求进行校对:

  351. ## 校对要求
  352. 1. 只进行错别字修正、标点符号校正、语法错误修复
  353. 2. 保持原文意思和风格不变
  354. 3. 不要添加任何说明、注释、标记或解释
  355. 4. 绝对禁止使用 {句式修正}、{补全} 等任何格式的标记
  356. 5. 不要添加任何MD格式(如**加粗**、*斜体*、`代码`等)
  357. 6. 输出必须是纯净的校对后文本

  358. ## 输出格式
  359. 直接输出校对后的纯净文本,不要添加任何额外内容。"""

  360.         messages = [
  361.             {"role": "system", "content": system_prompt},
  362.             {"role": "user", "content": f"请对以下文本进行专业校对,只修正语言错误,不要改变意思和风格,不要添加任何标记:\n\n{text}"}
  363.         ]

  364.         payload = {
  365.             "model": self.api_config['model_id'],
  366.             "messages": messages,
  367.             "temperature": 0.1,
  368.             "max_tokens": 4000,
  369.             "top_p": 0.9,
  370.             "stream": False
  371.         }

  372.         max_retries = self.processing_config['max_retries']
  373.         
  374.         for attempt in range(max_retries + 1):
  375.             try:
  376.                 self._log(f"正在调用校对API,文本长度: {len(text)} 字符 (第{attempt + 1}次尝试)")
  377.                
  378.                 response = self.session.post(
  379.                     self.chat_endpoint,
  380.                     headers=headers,
  381.                     json=payload,
  382.                     timeout=self.processing_config['timeout']
  383.                 )
  384.                
  385.                 if response.status_code != 200:
  386.                     if attempt < max_retries:
  387.                         wait_time = random.randint(5, 30)
  388.                         time.sleep(wait_time)
  389.                         continue
  390.                     else:
  391.                         return None
  392.                         
  393.                 response.raise_for_status()
  394.                 result = response.json()
  395.                
  396.                 corrected_text = result['choices'][0]['message']['content'].strip()
  397.                
  398.                 corrected_text = self._clean_markdown_annotations(corrected_text)
  399.                
  400.                 self._extract_noun_changes(text, corrected_text)
  401.                
  402.                 return corrected_text
  403.                     
  404.             except requests.exceptions.RequestException as e:
  405.                 if attempt < max_retries:
  406.                     wait_time = random.randint(5, 30)
  407.                     time.sleep(wait_time)
  408.                     continue
  409.                 else:
  410.                     return None
  411.             except (KeyError, IndexError, json.JSONDecodeError) as e:
  412.                 return None

  413.     def process_text_sentences(self, text):
  414.         preprocessed_text = self.noun_manager.apply_corrections(text)
  415.         if preprocessed_text != text:
  416.             self._log("已应用名词修正字典进行预处理")
  417.         
  418.         sentences = self.sentence_segmentation(preprocessed_text)
  419.         self._log(f"检测到句子数: {len(sentences)}")
  420.         
  421.         if len(sentences) == 0:
  422.             return text
  423.         
  424.         start_time = time.time()
  425.         corrected_sentences = self.batch_call_api(sentences)
  426.         processing_time = time.time() - start_time
  427.         
  428.         self._log(f"句子处理完成,耗时: {processing_time:.2f}秒")
  429.         
  430.         final_sentences = []
  431.         for i, (original, corrected) in enumerate(zip(sentences, corrected_sentences)):
  432.             if corrected is None:
  433.                 final_sentences.append(original)
  434.             else:
  435.                 final_sentences.append(corrected)
  436.         
  437.         result_text = ' '.join(final_sentences)
  438.         
  439.         result_text = self.remove_duplicate_paragraphs(result_text)
  440.         
  441.         return result_text
  442.    
  443.     def process_chapter(self, input_path, output_dir):
  444.         try:
  445.             with open(input_path, 'r', encoding='utf-8') as f:
  446.                 original_text = f.read()
  447.             self._log(f"读取文件成功,长度: {len(original_text)} 字符")
  448.         except Exception as e:
  449.             self._log(f"文件读取失败: {str(e)}")
  450.             return False
  451.         
  452.         start_time = time.time()
  453.         corrected_text = self.process_text_sentences(original_text)
  454.         processing_time = time.time() - start_time
  455.         
  456.         if corrected_text is None:
  457.             self._log("处理失败,跳过此文件")
  458.             return False
  459.         
  460.         self._log(f"文本处理完成,耗时: {processing_time:.2f}秒")
  461.         
  462.         if self.noun_changes:
  463.             self.noun_manager.save_dictionary()
  464.         
  465.         output_path = output_dir / input_path.name
  466.         try:
  467.             with open(output_path, 'w', encoding='utf-8') as f:
  468.                 f.write(corrected_text)
  469.             self._log(f"已保存: {output_path.name}")
  470.             return True
  471.         except Exception as e:
  472.             self._log(f"文件保存失败: {str(e)}")
  473.             return False
  474.    
  475.     def main(self):
  476.         if self.nlp is None:
  477.             self._log("警告: spaCy模型未正确加载,使用备用方案")
  478.         
  479.         book_name = input("请输入需要校对的书名: ").strip()
  480.         if not book_name:
  481.             print("错误: 书名不能为空")
  482.             return
  483.         
  484.         source_dir = Path(book_name)
  485.         if not source_dir.exists() or not source_dir.is_dir():
  486.             print(f"错误: 找不到 '{book_name}' 文件夹")
  487.             return
  488.         
  489.         output_dir = Path(f"{book_name}{self.paths_config['source_suffix']}")
  490.         
  491.         try:
  492.             output_dir.mkdir(exist_ok=True)
  493.         except Exception as e:
  494.             print(f"创建输出目录失败: {str(e)}")
  495.             return
  496.         
  497.         txt_files = list(source_dir.glob("*.txt"))
  498.         if not txt_files:
  499.             print(f"警告: 没有找到txt文件")
  500.             return
  501.         
  502.         total_start = time.time()
  503.         success_count = 0
  504.         print(f"开始逐句处理 {len(txt_files)} 个文件...")
  505.         
  506.         for i, file_path in enumerate(txt_files, 1):
  507.             print(f"\n[{i}/{len(txt_files)}] 处理: {file_path.name}")
  508.             if self.process_chapter(file_path, output_dir):
  509.                 success_count += 1
  510.         
  511.         if self.noun_changes:
  512.             self.noun_manager.save_dictionary()
  513.             print(f"\n名词修正字典已更新: {len(self.noun_manager.correction_dict)} 条记录")
  514.         
  515.         total_time = time.time() - total_start
  516.         print("\n" + "="*50)
  517.         print(f"全部处理完成,总耗时: {total_time:.2f}秒")
  518.         print(f"平均每个文件: {total_time/len(txt_files):.2f}秒")
  519.         print(f"结果目录: {output_dir.resolve()}")
  520.         print(f"成功处理: {success_count}/{len(txt_files)} 个文件")

  521. def main():
  522.     try:
  523.         proofreader = OptimizedTextProofreader('config.ini')
  524.         proofreader.main()
  525.     except FileNotFoundError as e:
  526.         print(f"错误: {e}")
  527.         print("请确保 config.ini 配置文件存在")
  528.     except ValueError as e:
  529.         print(f"配置错误: {e}")
  530.     except Exception as e:
  531.         print(f"程序初始化失败: {e}")

  532. if __name__ == "__main__":
  533.     main()
复制代码


目前来看,效率还有待提高,有些文本会瞎改,这个是大模型的问题吗?有没有什么改进方法?
小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

发表于 3 小时前 | 显示全部楼层
你的文本自动校对软件设计得很棒,利用了多线程和缓存机制来提升效率!关于你提到的问题,我来逐一分析并提供改进建议。

问题1:效率有待提高
当前代码使用多线程和批处理,但可能受限于API响应速度或配置参数。以下是一些优化方向:

  • 调整批处理大小:在config.ini中,尝试减小 batch_size(例如设为1-3),避免API过载。
  • 优化线程数:根据你的网络和API限制,调整 max_workers(例如3-5个线程)。
  • 句子分割优化:如果文本句子过长,可降低 max_chunk_size(例如1000字符),但需平衡API调用次数。
  • 启用缓存:确保 enable_cache = True,避免重复处理相同句子。


问题2:大模型“瞎改”文本
这确实部分源于大模型的不稳定性,但可以通过以下方法缓解:

  • 强化提示词:在 system_prompt 中更明确地限制模型行为。例如:
    1. system_prompt = """你是一位专业校对员,严格遵循:
    2. 1. 仅修正错别字、标点和语法错误,绝不改变原意。
    3. 2. 输出纯净文本,无任何标记或解释。
    4. 3. 如果原文无误,直接返回原文。"""
    复制代码

  • 添加后处理验证:在API返回后,对比原文本和修正文本,如果修改比例过高(例如超过30%),则回退到原文本。示例代码:
    1. def validate_correction(original, corrected, threshold=0.3):
    2.     if corrected is None:
    3.         return original
    4.     diff = difflib.SequenceMatcher(None, original, corrected).ratio()
    5.     if diff < (1 - threshold):  # 如果修改过多
    6.         return original
    7.     return corrected
    复制代码

  • 调整API参数:降低 temperature(当前0.1已较好)和 top_p,减少随机性。
  • 使用名词字典:你已实现名词修正,可定期更新 noun_correction_dict.csv 来纠正常见错误。


其他通用建议
  • 监控API响应:记录失败请求,并实现指数退避重试(你的代码已有)。
  • 测试不同模型:如果API支持,尝试其他更稳定的模型。
  • 分阶段处理:先处理短文本测试效果,再逐步扩展。


总之,效率问题可通过参数调优解决,“瞎改”问题需结合提示词优化和后处理验证。希望这些建议能帮到你!如果还有具体问题,欢迎继续讨论。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2 小时前 | 显示全部楼层
FishC 发表于 2025-11-16 08:26
你的文本自动校对软件设计得很棒,利用了多线程和缓存机制来提升效率!关于你提到的问题,我来逐一分析并提 ...

我现在是逐句校对,如果采取更大的段落修改之类,会不会提高效率?
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层
shadowmage 发表于 2025-11-16 09:57
FishC 发表于 2025-11-16 08:26
你的文本自动校对软 ...


是的,采用更大的段落(而不是逐句)进行校对可以显著提高效率!这主要是因为减少了API调用次数,从而降低了网络延迟和开销。让我详细解释一下原理和实现方法。

## 为什么段落校对更高效?

  • 减少API调用次数:逐句校对意味着每个句子都要单独调用API,而段落校对可以将多个句子合并为一次调用
  • 降低网络开销:每次API调用都有固定的网络延迟,合并后总体延迟大大减少
  • 更好地利用批处理:大模型本身擅长处理连贯文本,段落级校对可以保持上下文一致性


## 具体实现方案

### 1. 修改文本分割逻辑
将原来的句子分割改为段落分割:


  1. def paragraph_segmentation(self, text):
  2.     """将文本分割为段落,而不是句子"""
  3.     # 首先按空行分割自然段落
  4.     raw_paragraphs = re.split(r'\n\s*\n', text)
  5.    
  6.     merged_paragraphs = []
  7.     current_chunk = ""
  8.    
  9.     for para in raw_paragraphs:
  10.         para = para.strip()
  11.         if not para:
  12.             continue
  13.             
  14.         # 如果当前段落加上新段落不超过最大长度,就合并
  15.         if len(current_chunk) + len(para) <= self.processing_config['max_chunk_size']:
  16.             if current_chunk:
  17.                 current_chunk += "\n\n" + para
  18.             else:
  19.                 current_chunk = para
  20.         else:
  21.             # 当前块已满,保存并开始新块
  22.             if current_chunk:
  23.                 merged_paragraphs.append(current_chunk)
  24.             current_chunk = para
  25.    
  26.     # 添加最后一个块
  27.     if current_chunk:
  28.         merged_paragraphs.append(current_chunk)
  29.    
  30.     self._log(f"文本分割为 {len(merged_paragraphs)} 个段落")
  31.     return merged_paragraphs
复制代码


### 2. 更新处理流程
在 `process_text_sentences` 方法中替换分割方式:


  1. def process_text_paragraphs(self, text):
  2.     preprocessed_text = self.noun_manager.apply_corrections(text)
  3.    
  4.     # 使用段落分割而不是句子分割
  5.     paragraphs = self.paragraph_segmentation(preprocessed_text)
  6.     self._log(f"检测到段落数: {len(paragraphs)}")
  7.    
  8.     if len(paragraphs) == 0:
  9.         return text
  10.    
  11.     start_time = time.time()
  12.     corrected_paragraphs = self.batch_call_api(paragraphs)  # 这里处理的是段落
  13.     processing_time = time.time() - start_time
  14.    
  15.     self._log(f"段落处理完成,耗时: {processing_time:.2f}秒")
  16.    
  17.     final_paragraphs = []
  18.     for i, (original, corrected) in enumerate(zip(paragraphs, corrected_paragraphs)):
  19.         if corrected is None:
  20.             final_paragraphs.append(original)
  21.         else:
  22.             final_paragraphs.append(corrected)
  23.    
  24.     # 用两个换行符重新连接段落
  25.     result_text = '\n\n'.join(final_paragraphs)
  26.     result_text = self.remove_duplicate_paragraphs(result_text)
  27.    
  28.     return result_text
复制代码


### 3. 调整配置参数
在 config.ini 中相应调整:

[Processing]
max_chunk_size = 3000  # 增加到段落级别
long_text_threshold = 2000


## 优化后的提示词

为了确保段落校对质量,需要强化系统提示词:


  1. system_prompt = """你是一位专业的小说文本校对专家。请严格按照以下要求进行段落级校对:

  2. ## 校对要求
  3. 1. 对整个段落进行连贯性校对,保持原文意思、风格和语气不变
  4. 2. 只修正错别字、标点符号错误、语法错误
  5. 3. 保持段落的自然流畅,不要改变段落结构
  6. 4. 绝对禁止添加任何说明、注释、标记或解释
  7. 5. 输出必须是纯净的校对后段落文本

  8. ## 重要原则
  9. - 如果原文没有明显错误,直接返回原文
  10. - 保持人物对话的自然性和口语化特征
  11. - 保留原文的文学修辞和表达风格

  12. 直接输出校对后的纯净段落文本,不要添加任何额外内容。"""
复制代码


## 预期效果

  • 效率提升:通常能减少50-80%的API调用次数,总体处理时间大幅缩短
  • 质量改进:段落级校对能更好地保持上下文连贯性,减少"瞎改"现象
  • 成本降低:API调用次数减少,相应的使用成本也会降低


## 注意事项

  • 段落大小要适中:建议 max_chunk_size 在2000-4000字符之间,避免超出模型处理能力
  • 监控质量变化:初期建议对比测试,确保段落校对不会引入新的问题
  • 保留逐句选项:对于特别重要的文本,可以保留逐句校对作为备选方案


建议你先在小规模文本上测试这种段落校对方式,确认效果后再全面应用。如果发现某些段落被过度修改,可以进一步调整提示词或减小段落大小。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-11-16 12:14

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表