|
|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
这个作品,还不是很完善,因为速度太慢。大家能够完善这个代码。巩固自己的知识体系。
如果用的过程中有需要改进的请提出来,给大家更新!
import cv2
import numpy as np
import argparse
from scipy import ndimage
from tqdm import tqdm
import os
class VideoDenoiser:
def __init__(self):
self.noise_levels = {
'low': {'h': 3, 'hColor': 3, 'templateWindowSize': 7, 'searchWindowSize': 21},
'medium': {'h': 5, 'hColor': 5, 'templateWindowSize': 7, 'searchWindowSize': 21},
'high': {'h': 10, 'hColor': 10, 'templateWindowSize': 7, 'searchWindowSize': 35}
}
def estimate_noise_level(self, frame):
"""
自动估计帧的噪声水平
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像的梯度幅度
grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
# 计算噪声估计
noise_estimate = np.std(gradient_magnitude)
if noise_estimate < 5:
return 'low'
elif noise_estimate < 15:
return 'medium'
else:
return 'high'
def temporal_denoise(self, frames):
"""
时域降噪 - 使用帧间平均
"""
if len(frames) < 3:
return frames[1] if len(frames) > 1 else frames[0]
# 加权平均,当前帧权重最高
weights = [0.25, 0.5, 0.25]
denoised = np.zeros_like(frames[1], dtype=np.float32)
for i, weight in enumerate(weights):
denoised += frames[i].astype(np.float32) * weight
return denoised.astype(np.uint8)
def spatial_denoise(self, frame, noise_level):
"""
空域降噪 - 使用非局部均值降噪
"""
params = self.noise_levels[noise_level]
# 应用非局部均值降噪
denoised = cv2.fastNlMeansDenoisingColored(
frame,
h=params['h'],
hColor=params['hColor'],
templateWindowSize=params['templateWindowSize'],
searchWindowSize=params['searchWindowSize']
)
return denoised
def wavelet_denoise(self, frame, strength=0.1):
"""
小波降噪(简化版)
"""
# 转换为YUV颜色空间
yuv = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
y, u, v = cv2.split(yuv)
# 对亮度通道应用高斯模糊作为简化的小波降噪
y_denoised = cv2.GaussianBlur(y, (3, 3), strength * 2)
# 合并通道
yuv_denoised = cv2.merge([y_denoised, u, v])
denoised = cv2.cvtColor(yuv_denoised, cv2.COLOR_YUV2BGR)
return denoised
def adaptive_denoise(self, frame, previous_frames=None):
"""
自适应降噪 - 结合多种技术
"""
# 估计噪声水平
noise_level = self.estimate_noise_level(frame)
print(f"检测到噪声水平: {noise_level}")
# 应用空域降噪
spatially_denoised = self.spatial_denoise(frame, noise_level)
# 应用小波降噪
wavelet_denoised = self.wavelet_denoise(spatially_denoised)
# 如果有多帧,应用时域降噪
if previous_frames and len(previous_frames) >= 2:
temporal_frames = previous_frames + [wavelet_denoised]
final_denoised = self.temporal_denoise(temporal_frames[-3:])
else:
final_denoised = wavelet_denoised
return final_denoised, noise_level
def process_video(input_path, output_path, max_frames=None):
"""
处理视频的主函数
"""
# 检查输入文件
if not os.path.exists(input_path):
print(f"错误: 输入文件 {input_path} 不存在")
return
# 初始化视频捕获
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
print("错误: 无法打开视频文件")
return
# 获取视频信息
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if max_frames:
total_frames = min(total_frames, max_frames)
print(f"视频信息: {width}x{height}, {fps} FPS, 总帧数: {total_frames}")
# 初始化视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 初始化降噪器
denoiser = VideoDenoiser()
# 存储前几帧用于时域降噪
previous_frames = []
print("开始处理视频...")
for frame_count in tqdm(range(total_frames)):
ret, frame = cap.read()
if not ret:
break
# 应用自适应降噪
denoised_frame, noise_level = denoiser.adaptive_denoise(frame, previous_frames)
# 更新前帧缓冲区
previous_frames.append(denoised_frame)
if len(previous_frames) > 3:
previous_frames.pop(0)
# 写入处理后的帧
out.write(denoised_frame)
# 每100帧显示进度信息
if frame_count % 100 == 0:
print(f"已处理 {frame_count}/{total_frames} 帧, 当前噪声水平: {noise_level}")
# 释放资源
cap.release()
out.release()
cv2.destroyAllWindows()
print(f"处理完成! 输出文件: {output_path}")
def batch_process_videos(input_folder, output_folder):
"""
批量处理文件夹中的所有视频
"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv']
for filename in os.listdir(input_folder):
if any(filename.lower().endswith(ext) for ext in video_extensions):
input_path = os.path.join(input_folder, filename)
output_path = os.path.join(output_folder, f"denoised_{filename}")
print(f"处理: {filename}")
process_video(input_path, output_path)
def main():
parser = argparse.ArgumentParser(description='视频自动降噪工具')
parser.add_argument('--input', '-i', required=True, help='输入视频文件路径')
parser.add_argument('--output', '-o', required=True, help='输出视频文件路径')
parser.add_argument('--max-frames', '-m', type=int, help='最大处理帧数(用于测试)')
parser.add_argument('--batch', '-b', action='store_true', help='批量处理模式')
parser.add_argument('--input-folder', help='批量处理的输入文件夹')
parser.add_argument('--output-folder', help='批量处理的输出文件夹')
args = parser.parse_args()
if args.batch:
if not args.input_folder or not args.output_folder:
print("批量处理模式需要指定输入和输出文件夹")
return
batch_process_videos(args.input_folder, args.output_folder)
else:
process_video(args.input_path, args.output_path, args.max_frames)
if __name__ == "__main__":
# 如果直接运行,使用示例
if len(os.sys.argv) == 1:
print("视频降噪工具 - 使用示例:")
print("单个文件处理: python video_denoiser.py -i input.mp4 -o output.mp4")
print("批量处理: python video_denoiser.py -b --input-folder ./input --output-folder ./output")
print("测试处理(前100帧): python video_denoiser.py -i input.mp4 -o output.mp4 -m 100")
else:
main()
|
|