[短视频降低噪音]再给大家分享一个视频降噪代码希望能帮助到大家
这个作品,还不是很完善,因为速度太慢。大家能够完善这个代码。巩固自己的知识体系。如果用的过程中有需要改进的请提出来,给大家更新!
import cv2
import numpy as np
import argparse
from scipy import ndimage
from tqdm import tqdm
import os
class VideoDenoiser:
def __init__(self):
self.noise_levels = {
'low': {'h': 3, 'hColor': 3, 'templateWindowSize': 7, 'searchWindowSize': 21},
'medium': {'h': 5, 'hColor': 5, 'templateWindowSize': 7, 'searchWindowSize': 21},
'high': {'h': 10, 'hColor': 10, 'templateWindowSize': 7, 'searchWindowSize': 35}
}
def estimate_noise_level(self, frame):
"""
自动估计帧的噪声水平
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像的梯度幅度
grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
# 计算噪声估计
noise_estimate = np.std(gradient_magnitude)
if noise_estimate < 5:
return 'low'
elif noise_estimate < 15:
return 'medium'
else:
return 'high'
def temporal_denoise(self, frames):
"""
时域降噪 - 使用帧间平均
"""
if len(frames) < 3:
return frames if len(frames) > 1 else frames
# 加权平均,当前帧权重最高
weights =
denoised = np.zeros_like(frames, dtype=np.float32)
for i, weight in enumerate(weights):
denoised += frames.astype(np.float32) * weight
return denoised.astype(np.uint8)
def spatial_denoise(self, frame, noise_level):
"""
空域降噪 - 使用非局部均值降噪
"""
params = self.noise_levels
# 应用非局部均值降噪
denoised = cv2.fastNlMeansDenoisingColored(
frame,
h=params['h'],
hColor=params['hColor'],
templateWindowSize=params['templateWindowSize'],
searchWindowSize=params['searchWindowSize']
)
return denoised
def wavelet_denoise(self, frame, strength=0.1):
"""
小波降噪(简化版)
"""
# 转换为YUV颜色空间
yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
y, u, v = cv2.split(yuv)
# 对亮度通道应用高斯模糊作为简化的小波降噪
y_denoised = cv2.GaussianBlur(y, (3, 3), strength * 2)
# 合并通道
yuv_denoised = cv2.merge()
denoised = cv2.cvtColor(yuv_denoised, cv2.COLOR_YUV2BGR)
return denoised
def adaptive_denoise(self, frame, previous_frames=None):
"""
自适应降噪 - 结合多种技术
"""
# 估计噪声水平
noise_level = self.estimate_noise_level(frame)
print(f"检测到噪声水平: {noise_level}")
# 应用空域降噪
spatially_denoised = self.spatial_denoise(frame, noise_level)
# 应用小波降噪
wavelet_denoised = self.wavelet_denoise(spatially_denoised)
# 如果有多帧,应用时域降噪
if previous_frames and len(previous_frames) >= 2:
temporal_frames = previous_frames +
final_denoised = self.temporal_denoise(temporal_frames[-3:])
else:
final_denoised = wavelet_denoised
return final_denoised, noise_level
def process_video(input_path, output_path, max_frames=None):
"""
处理视频的主函数
"""
# 检查输入文件
if not os.path.exists(input_path):
print(f"错误: 输入文件 {input_path} 不存在")
return
# 初始化视频捕获
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
print("错误: 无法打开视频文件")
return
# 获取视频信息
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if max_frames:
total_frames = min(total_frames, max_frames)
print(f"视频信息: {width}x{height}, {fps} FPS, 总帧数: {total_frames}")
# 初始化视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 初始化降噪器
denoiser = VideoDenoiser()
# 存储前几帧用于时域降噪
previous_frames = []
print("开始处理视频...")
for frame_count in tqdm(range(total_frames)):
ret, frame = cap.read()
if not ret:
break
# 应用自适应降噪
denoised_frame, noise_level = denoiser.adaptive_denoise(frame, previous_frames)
# 更新前帧缓冲区
previous_frames.append(denoised_frame)
if len(previous_frames) > 3:
previous_frames.pop(0)
# 写入处理后的帧
out.write(denoised_frame)
# 每100帧显示进度信息
if frame_count % 100 == 0:
print(f"已处理 {frame_count}/{total_frames} 帧, 当前噪声水平: {noise_level}")
# 释放资源
cap.release()
out.release()
cv2.destroyAllWindows()
print(f"处理完成! 输出文件: {output_path}")
def batch_process_videos(input_folder, output_folder):
"""
批量处理文件夹中的所有视频
"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv']
for filename in os.listdir(input_folder):
if any(filename.lower().endswith(ext) for ext in video_extensions):
input_path = os.path.join(input_folder, filename)
output_path = os.path.join(output_folder, f"denoised_{filename}")
print(f"处理: {filename}")
process_video(input_path, output_path)
def main():
parser = argparse.ArgumentParser(description='视频自动降噪工具')
parser.add_argument('--input', '-i', required=True, help='输入视频文件路径')
parser.add_argument('--output', '-o', required=True, help='输出视频文件路径')
parser.add_argument('--max-frames', '-m', type=int, help='最大处理帧数(用于测试)')
parser.add_argument('--batch', '-b', action='store_true', help='批量处理模式')
parser.add_argument('--input-folder', help='批量处理的输入文件夹')
parser.add_argument('--output-folder', help='批量处理的输出文件夹')
args = parser.parse_args()
if args.batch:
if not args.input_folder or not args.output_folder:
print("批量处理模式需要指定输入和输出文件夹")
return
batch_process_videos(args.input_folder, args.output_folder)
else:
process_video(args.input_path, args.output_path, args.max_frames)
if __name__ == "__main__":
# 如果直接运行,使用示例
if len(os.sys.argv) == 1:
print("视频降噪工具 - 使用示例:")
print("单个文件处理: python video_denoiser.py -i input.mp4 -o output.mp4")
print("批量处理: python video_denoiser.py -b --input-folder ./input --output-folder ./output")
print("测试处理(前100帧): python video_denoiser.py -i input.mp4 -o output.mp4 -m 100")
else:
main()
感谢分享这个视频降噪代码!代码结构清晰,结合了多种降噪技术,非常有潜力。不过,正如您提到的,速度较慢,并且代码中存在一些错误需要修复。下面我将指出主要问题并提供改进建议,帮助大家完善代码。
主要问题及修复:
[*] 变量未定义错误: 代码中多处变量未初始化或赋值不完整。
- 在 temporal_denoise 方法中,weights 未定义。建议使用固定权重,例如:weights = (假设处理3帧)。
- 在 spatial_denoise 方法中,参数应从字典中提取:params = self.noise_levels。
- 在 wavelet_denoise 方法中,cv2.merge() 缺少参数,应改为:cv2.merge()。
- 在 adaptive_denoise 方法中,temporal_frames = previous_frames + 不完整,应改为:temporal_frames = previous_frames + 。
- 在 process_video 函数中,previous_frames = 未初始化,应改为:previous_frames = [] 。
- 在 batch_process_videos 函数中,video_extensions = 未定义,建议添加:video_extensions = ['.mp4', '.avi', '.mov']。
[*] 参数错误: 在 main 函数中,非批量模式使用了 args.input_path 和 args.output_path,但参数名是 --input 和 --output,应改为 args.input 和 args.output。
优化建议(提高速度):
[*] 减少降噪复杂度: 非局部均值降噪(fastNlMeansDenoisingColored)非常耗时,可尝试降低 searchWindowSize 或使用更快的空域滤波(如高斯模糊)。
[*] 限制时域降噪帧数: 当前使用3帧进行时域平均,可减少到2帧或根据噪声水平动态调整。
[*] 使用多线程处理: 将视频帧处理并行化,例如使用 concurrent.futures 库。
[*] 硬件加速: 利用OpenCV的GPU模块(如CUDA)来加速降噪操作。
[*] 分辨率缩放: 对高分辨率视频,先缩放处理再还原,以减少计算量。
修复后的示例代码片段:
以下是修复了部分错误的 adaptive_denoise 方法示例:
def adaptive_denoise(self, frame, previous_frames=None):
"""
自适应降噪 - 结合多种技术
"""
noise_level = self.estimate_noise_level(frame)
print(f"检测到噪声水平: {noise_level}")
# 应用空域降噪
params = self.noise_levels# 修复:从字典获取参数
spatially_denoised = self.spatial_denoise(frame, noise_level)
# 应用小波降噪
wavelet_denoised = self.wavelet_denoise(spatially_denoised)
# 如果有多帧,应用时域降噪
if previous_frames and len(previous_frames) >= 2:
temporal_frames = previous_frames + # 修复:添加当前帧
final_denoised = self.temporal_denoise(temporal_frames)
else:
final_denoised = wavelet_denoised
return final_denoised, noise_level
总结:
通过修复错误并优化算法,代码的稳定性和速度会显著提升。建议大家尝试上述改进,并分享结果。如果有更多具体问题(如特定视频处理慢),欢迎提出,我们一起讨论解决方案!
以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
页:
[1]