yemanet 发表于 4 天前

[短视频降低噪音]再给大家分享一个视频降噪代码希望能帮助到大家

这个作品,还不是很完善,因为速度太慢。大家能够完善这个代码。巩固自己的知识体系。
如果用的过程中有需要改进的请提出来,给大家更新!


import cv2
import numpy as np
import argparse
from scipy import ndimage
from tqdm import tqdm
import os

class VideoDenoiser:
    def __init__(self):
      self.noise_levels = {
            'low': {'h': 3, 'hColor': 3, 'templateWindowSize': 7, 'searchWindowSize': 21},
            'medium': {'h': 5, 'hColor': 5, 'templateWindowSize': 7, 'searchWindowSize': 21},
            'high': {'h': 10, 'hColor': 10, 'templateWindowSize': 7, 'searchWindowSize': 35}
      }
   
    def estimate_noise_level(self, frame):
      """
      自动估计帧的噪声水平
      """
      gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      
      # 计算图像的梯度幅度
      grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
      grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
      gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
      
      # 计算噪声估计
      noise_estimate = np.std(gradient_magnitude)
      
      if noise_estimate < 5:
            return 'low'
      elif noise_estimate < 15:
            return 'medium'
      else:
            return 'high'
   
    def temporal_denoise(self, frames):
      """
      时域降噪 - 使用帧间平均
      """
      if len(frames) < 3:
            return frames if len(frames) > 1 else frames
      
      # 加权平均,当前帧权重最高
      weights =
      denoised = np.zeros_like(frames, dtype=np.float32)
      
      for i, weight in enumerate(weights):
            denoised += frames.astype(np.float32) * weight
      
      return denoised.astype(np.uint8)
   
    def spatial_denoise(self, frame, noise_level):
      """
      空域降噪 - 使用非局部均值降噪
      """
      params = self.noise_levels
      
      # 应用非局部均值降噪
      denoised = cv2.fastNlMeansDenoisingColored(
            frame,
            h=params['h'],
            hColor=params['hColor'],
            templateWindowSize=params['templateWindowSize'],
            searchWindowSize=params['searchWindowSize']
      )
      
      return denoised
   
    def wavelet_denoise(self, frame, strength=0.1):
      """
      小波降噪(简化版)
      """
      # 转换为YUV颜色空间
      yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
      y, u, v = cv2.split(yuv)
      
      # 对亮度通道应用高斯模糊作为简化的小波降噪
      y_denoised = cv2.GaussianBlur(y, (3, 3), strength * 2)
      
      # 合并通道
      yuv_denoised = cv2.merge()
      denoised = cv2.cvtColor(yuv_denoised, cv2.COLOR_YUV2BGR)
      
      return denoised
   
    def adaptive_denoise(self, frame, previous_frames=None):
      """
      自适应降噪 - 结合多种技术
      """
      # 估计噪声水平
      noise_level = self.estimate_noise_level(frame)
      print(f"检测到噪声水平: {noise_level}")
      
      # 应用空域降噪
      spatially_denoised = self.spatial_denoise(frame, noise_level)
      
      # 应用小波降噪
      wavelet_denoised = self.wavelet_denoise(spatially_denoised)
      
      # 如果有多帧,应用时域降噪
      if previous_frames and len(previous_frames) >= 2:
            temporal_frames = previous_frames +
            final_denoised = self.temporal_denoise(temporal_frames[-3:])
      else:
            final_denoised = wavelet_denoised
      
      return final_denoised, noise_level

def process_video(input_path, output_path, max_frames=None):
    """
    处理视频的主函数
    """
    # 检查输入文件
    if not os.path.exists(input_path):
      print(f"错误: 输入文件 {input_path} 不存在")
      return
   
    # 初始化视频捕获
    cap = cv2.VideoCapture(input_path)
   
    if not cap.isOpened():
      print("错误: 无法打开视频文件")
      return
   
    # 获取视频信息
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
   
    if max_frames:
      total_frames = min(total_frames, max_frames)
   
    print(f"视频信息: {width}x{height}, {fps} FPS, 总帧数: {total_frames}")
   
    # 初始化视频写入器
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
   
    # 初始化降噪器
    denoiser = VideoDenoiser()
   
    # 存储前几帧用于时域降噪
    previous_frames = []
   
    print("开始处理视频...")
   
    for frame_count in tqdm(range(total_frames)):
      ret, frame = cap.read()
      
      if not ret:
            break
      
      # 应用自适应降噪
      denoised_frame, noise_level = denoiser.adaptive_denoise(frame, previous_frames)
      
      # 更新前帧缓冲区
      previous_frames.append(denoised_frame)
      if len(previous_frames) > 3:
            previous_frames.pop(0)
      
      # 写入处理后的帧
      out.write(denoised_frame)
      
      # 每100帧显示进度信息
      if frame_count % 100 == 0:
            print(f"已处理 {frame_count}/{total_frames} 帧, 当前噪声水平: {noise_level}")
   
    # 释放资源
    cap.release()
    out.release()
    cv2.destroyAllWindows()
   
    print(f"处理完成! 输出文件: {output_path}")

def batch_process_videos(input_folder, output_folder):
    """
    批量处理文件夹中的所有视频
    """
    if not os.path.exists(output_folder):
      os.makedirs(output_folder)
   
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv']
   
    for filename in os.listdir(input_folder):
      if any(filename.lower().endswith(ext) for ext in video_extensions):
            input_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, f"denoised_{filename}")
            
            print(f"处理: {filename}")
            process_video(input_path, output_path)

def main():
    parser = argparse.ArgumentParser(description='视频自动降噪工具')
    parser.add_argument('--input', '-i', required=True, help='输入视频文件路径')
    parser.add_argument('--output', '-o', required=True, help='输出视频文件路径')
    parser.add_argument('--max-frames', '-m', type=int, help='最大处理帧数(用于测试)')
    parser.add_argument('--batch', '-b', action='store_true', help='批量处理模式')
    parser.add_argument('--input-folder', help='批量处理的输入文件夹')
    parser.add_argument('--output-folder', help='批量处理的输出文件夹')
   
    args = parser.parse_args()
   
    if args.batch:
      if not args.input_folder or not args.output_folder:
            print("批量处理模式需要指定输入和输出文件夹")
            return
      batch_process_videos(args.input_folder, args.output_folder)
    else:
      process_video(args.input_path, args.output_path, args.max_frames)

if __name__ == "__main__":
    # 如果直接运行,使用示例
    if len(os.sys.argv) == 1:
      print("视频降噪工具 - 使用示例:")
      print("单个文件处理: python video_denoiser.py -i input.mp4 -o output.mp4")
      print("批量处理: python video_denoiser.py -b --input-folder ./input --output-folder ./output")
      print("测试处理(前100帧): python video_denoiser.py -i input.mp4 -o output.mp4 -m 100")
    else:
      main()

FishC 发表于 4 天前

感谢分享这个视频降噪代码!代码结构清晰,结合了多种降噪技术,非常有潜力。不过,正如您提到的,速度较慢,并且代码中存在一些错误需要修复。下面我将指出主要问题并提供改进建议,帮助大家完善代码。

主要问题及修复:


[*] 变量未定义错误: 代码中多处变量未初始化或赋值不完整。
   - 在 temporal_denoise 方法中,weights 未定义。建议使用固定权重,例如:weights = (假设处理3帧)。
   - 在 spatial_denoise 方法中,参数应从字典中提取:params = self.noise_levels。
   - 在 wavelet_denoise 方法中,cv2.merge() 缺少参数,应改为:cv2.merge()。
   - 在 adaptive_denoise 方法中,temporal_frames = previous_frames + 不完整,应改为:temporal_frames = previous_frames + 。
   - 在 process_video 函数中,previous_frames = 未初始化,应改为:previous_frames = [] 。
   - 在 batch_process_videos 函数中,video_extensions = 未定义,建议添加:video_extensions = ['.mp4', '.avi', '.mov']。
[*] 参数错误: 在 main 函数中,非批量模式使用了 args.input_path 和 args.output_path,但参数名是 --input 和 --output,应改为 args.input 和 args.output。


优化建议(提高速度):


[*] 减少降噪复杂度: 非局部均值降噪(fastNlMeansDenoisingColored)非常耗时,可尝试降低 searchWindowSize 或使用更快的空域滤波(如高斯模糊)。
[*] 限制时域降噪帧数: 当前使用3帧进行时域平均,可减少到2帧或根据噪声水平动态调整。
[*] 使用多线程处理: 将视频帧处理并行化,例如使用 concurrent.futures 库。
[*] 硬件加速: 利用OpenCV的GPU模块(如CUDA)来加速降噪操作。
[*] 分辨率缩放: 对高分辨率视频,先缩放处理再还原,以减少计算量。


修复后的示例代码片段:
以下是修复了部分错误的 adaptive_denoise 方法示例:


def adaptive_denoise(self, frame, previous_frames=None):
    """
    自适应降噪 - 结合多种技术
    """
    noise_level = self.estimate_noise_level(frame)
    print(f"检测到噪声水平: {noise_level}")
   
    # 应用空域降噪
    params = self.noise_levels# 修复:从字典获取参数
    spatially_denoised = self.spatial_denoise(frame, noise_level)
   
    # 应用小波降噪
    wavelet_denoised = self.wavelet_denoise(spatially_denoised)
   
    # 如果有多帧,应用时域降噪
    if previous_frames and len(previous_frames) >= 2:
      temporal_frames = previous_frames + # 修复:添加当前帧
      final_denoised = self.temporal_denoise(temporal_frames)
    else:
      final_denoised = wavelet_denoised
   
    return final_denoised, noise_level


总结:
通过修复错误并优化算法,代码的稳定性和速度会显著提升。建议大家尝试上述改进,并分享结果。如果有更多具体问题(如特定视频处理慢),欢迎提出,我们一起讨论解决方案!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
页: [1]
查看完整版本: [短视频降低噪音]再给大家分享一个视频降噪代码希望能帮助到大家