鱼C论坛

 找回密码
 立即注册
查看: 75|回复: 1

[作品展示] [短视频降低噪音]再给大家分享一个视频降噪代码希望能帮助到大家

[复制链接]
发表于 4 天前 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
这个作品,还不是很完善,因为速度太慢。大家能够完善这个代码。巩固自己的知识体系。
如果用的过程中有需要改进的请提出来,给大家更新!


import cv2
import numpy as np
import argparse
from scipy import ndimage
from tqdm import tqdm
import os

class VideoDenoiser:
    def __init__(self):
        self.noise_levels = {
            'low': {'h': 3, 'hColor': 3, 'templateWindowSize': 7, 'searchWindowSize': 21},
            'medium': {'h': 5, 'hColor': 5, 'templateWindowSize': 7, 'searchWindowSize': 21},
            'high': {'h': 10, 'hColor': 10, 'templateWindowSize': 7, 'searchWindowSize': 35}
        }
   
    def estimate_noise_level(self, frame):
        """
        自动估计帧的噪声水平
        """
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # 计算图像的梯度幅度
        grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
        gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
        
        # 计算噪声估计
        noise_estimate = np.std(gradient_magnitude)
        
        if noise_estimate < 5:
            return 'low'
        elif noise_estimate < 15:
            return 'medium'
        else:
            return 'high'
   
    def temporal_denoise(self, frames):
        """
        时域降噪 - 使用帧间平均
        """
        if len(frames) < 3:
            return frames[1] if len(frames) > 1 else frames[0]
        
        # 加权平均,当前帧权重最高
        weights = [0.25, 0.5, 0.25]
        denoised = np.zeros_like(frames[1], dtype=np.float32)
        
        for i, weight in enumerate(weights):
            denoised += frames[i].astype(np.float32) * weight
        
        return denoised.astype(np.uint8)
   
    def spatial_denoise(self, frame, noise_level):
        """
        空域降噪 - 使用非局部均值降噪
        """
        params = self.noise_levels[noise_level]
        
        # 应用非局部均值降噪
        denoised = cv2.fastNlMeansDenoisingColored(
            frame,
            h=params['h'],
            hColor=params['hColor'],
            templateWindowSize=params['templateWindowSize'],
            searchWindowSize=params['searchWindowSize']
        )
        
        return denoised
   
    def wavelet_denoise(self, frame, strength=0.1):
        """
        小波降噪(简化版)
        """
        # 转换为YUV颜色空间
        yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
        y, u, v = cv2.split(yuv)
        
        # 对亮度通道应用高斯模糊作为简化的小波降噪
        y_denoised = cv2.GaussianBlur(y, (3, 3), strength * 2)
        
        # 合并通道
        yuv_denoised = cv2.merge([y_denoised, u, v])
        denoised = cv2.cvtColor(yuv_denoised, cv2.COLOR_YUV2BGR)
        
        return denoised
   
    def adaptive_denoise(self, frame, previous_frames=None):
        """
        自适应降噪 - 结合多种技术
        """
        # 估计噪声水平
        noise_level = self.estimate_noise_level(frame)
        print(f"检测到噪声水平: {noise_level}")
        
        # 应用空域降噪
        spatially_denoised = self.spatial_denoise(frame, noise_level)
        
        # 应用小波降噪
        wavelet_denoised = self.wavelet_denoise(spatially_denoised)
        
        # 如果有多帧,应用时域降噪
        if previous_frames and len(previous_frames) >= 2:
            temporal_frames = previous_frames + [wavelet_denoised]
            final_denoised = self.temporal_denoise(temporal_frames[-3:])
        else:
            final_denoised = wavelet_denoised
        
        return final_denoised, noise_level

def process_video(input_path, output_path, max_frames=None):
    """
    处理视频的主函数
    """
    # 检查输入文件
    if not os.path.exists(input_path):
        print(f"错误: 输入文件 {input_path} 不存在")
        return
   
    # 初始化视频捕获
    cap = cv2.VideoCapture(input_path)
   
    if not cap.isOpened():
        print("错误: 无法打开视频文件")
        return
   
    # 获取视频信息
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
   
    if max_frames:
        total_frames = min(total_frames, max_frames)
   
    print(f"视频信息: {width}x{height}, {fps} FPS, 总帧数: {total_frames}")
   
    # 初始化视频写入器
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
   
    # 初始化降噪器
    denoiser = VideoDenoiser()
   
    # 存储前几帧用于时域降噪
    previous_frames = []
   
    print("开始处理视频...")
   
    for frame_count in tqdm(range(total_frames)):
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # 应用自适应降噪
        denoised_frame, noise_level = denoiser.adaptive_denoise(frame, previous_frames)
        
        # 更新前帧缓冲区
        previous_frames.append(denoised_frame)
        if len(previous_frames) > 3:
            previous_frames.pop(0)
        
        # 写入处理后的帧
        out.write(denoised_frame)
        
        # 每100帧显示进度信息
        if frame_count % 100 == 0:
            print(f"已处理 {frame_count}/{total_frames} 帧, 当前噪声水平: {noise_level}")
   
    # 释放资源
    cap.release()
    out.release()
    cv2.destroyAllWindows()
   
    print(f"处理完成! 输出文件: {output_path}")

def batch_process_videos(input_folder, output_folder):
    """
    批量处理文件夹中的所有视频
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
   
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv']
   
    for filename in os.listdir(input_folder):
        if any(filename.lower().endswith(ext) for ext in video_extensions):
            input_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, f"denoised_{filename}")
            
            print(f"处理: {filename}")
            process_video(input_path, output_path)

def main():
    parser = argparse.ArgumentParser(description='视频自动降噪工具')
    parser.add_argument('--input', '-i', required=True, help='输入视频文件路径')
    parser.add_argument('--output', '-o', required=True, help='输出视频文件路径')
    parser.add_argument('--max-frames', '-m', type=int, help='最大处理帧数(用于测试)')
    parser.add_argument('--batch', '-b', action='store_true', help='批量处理模式')
    parser.add_argument('--input-folder', help='批量处理的输入文件夹')
    parser.add_argument('--output-folder', help='批量处理的输出文件夹')
   
    args = parser.parse_args()
   
    if args.batch:
        if not args.input_folder or not args.output_folder:
            print("批量处理模式需要指定输入和输出文件夹")
            return
        batch_process_videos(args.input_folder, args.output_folder)
    else:
        process_video(args.input_path, args.output_path, args.max_frames)

if __name__ == "__main__":
    # 如果直接运行,使用示例
    if len(os.sys.argv) == 1:
        print("视频降噪工具 - 使用示例:")
        print("单个文件处理: python video_denoiser.py -i input.mp4 -o output.mp4")
        print("批量处理: python video_denoiser.py -b --input-folder ./input --output-folder ./output")
        print("测试处理(前100帧): python video_denoiser.py -i input.mp4 -o output.mp4 -m 100")
    else:
        main()
小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

发表于 4 天前 | 显示全部楼层
感谢分享这个视频降噪代码!代码结构清晰,结合了多种降噪技术,非常有潜力。不过,正如您提到的,速度较慢,并且代码中存在一些错误需要修复。下面我将指出主要问题并提供改进建议,帮助大家完善代码。

主要问题及修复:

  • 变量未定义错误: 代码中多处变量未初始化或赋值不完整。
       - 在 temporal_denoise 方法中,weights 未定义。建议使用固定权重,例如:weights = [0.1, 0.8, 0.1](假设处理3帧)。
       - 在 spatial_denoise 方法中,参数应从字典中提取:params = self.noise_levels[noise_level]
       - 在 wavelet_denoise 方法中,cv2.merge() 缺少参数,应改为:cv2.merge([y_denoised, u, v])
       - 在 adaptive_denoise 方法中,temporal_frames = previous_frames + 不完整,应改为:temporal_frames = previous_frames + [wavelet_denoised]
       - 在 process_video 函数中,previous_frames = 未初始化,应改为:previous_frames = []
       - 在 batch_process_videos 函数中,video_extensions = 未定义,建议添加:video_extensions = ['.mp4', '.avi', '.mov']
  • 参数错误:main 函数中,非批量模式使用了 args.input_pathargs.output_path,但参数名是 --input--output,应改为 args.inputargs.output


优化建议(提高速度):

  • 减少降噪复杂度: 非局部均值降噪(fastNlMeansDenoisingColored)非常耗时,可尝试降低 searchWindowSize 或使用更快的空域滤波(如高斯模糊)。
  • 限制时域降噪帧数: 当前使用3帧进行时域平均,可减少到2帧或根据噪声水平动态调整。
  • 使用多线程处理: 将视频帧处理并行化,例如使用 concurrent.futures 库。
  • 硬件加速: 利用OpenCV的GPU模块(如CUDA)来加速降噪操作。
  • 分辨率缩放: 对高分辨率视频,先缩放处理再还原,以减少计算量。


修复后的示例代码片段:
以下是修复了部分错误的 adaptive_denoise 方法示例:


  1. def adaptive_denoise(self, frame, previous_frames=None):
  2.     """
  3.     自适应降噪 - 结合多种技术
  4.     """
  5.     noise_level = self.estimate_noise_level(frame)
  6.     print(f"检测到噪声水平: {noise_level}")
  7.    
  8.     # 应用空域降噪
  9.     params = self.noise_levels[noise_level]  # 修复:从字典获取参数
  10.     spatially_denoised = self.spatial_denoise(frame, noise_level)
  11.    
  12.     # 应用小波降噪
  13.     wavelet_denoised = self.wavelet_denoise(spatially_denoised)
  14.    
  15.     # 如果有多帧,应用时域降噪
  16.     if previous_frames and len(previous_frames) >= 2:
  17.         temporal_frames = previous_frames + [wavelet_denoised]  # 修复:添加当前帧
  18.         final_denoised = self.temporal_denoise(temporal_frames)
  19.     else:
  20.         final_denoised = wavelet_denoised
  21.    
  22.     return final_denoised, noise_level
复制代码


总结:
通过修复错误并优化算法,代码的稳定性和速度会显著提升。建议大家尝试上述改进,并分享结果。如果有更多具体问题(如特定视频处理慢),欢迎提出,我们一起讨论解决方案!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-11-19 11:12

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表