做视频领域时,常常有两种提取视频模态的方法:
本文主要讲解离线提取
的方式,因为离线提取对存储空间要求较高,但是具有一次提取多次复用
的优点。
conda install numpy==1.21.2 sk_video==1.1.10 tqdm==4.62.3 scikit-video
conda install opencv_python_headless==4.5.5.62 // 计算光流需要
// 还需要一个,ffmpeg的可执行文件,如果不知道怎么安装可以使用conda来安装
conda config --add channels conda-forge
conda install ffmpeg
注:Python因为有GIL锁,使得多线程效率降低,但是多进程不受影响
import concurrent.futures
import subprocess
import time
from tqdm import tqdm
def echo(x):
# 计算x的平方
print(x)
time.sleep(1)
return x*x, True
# 创建一个执行器,管理4个进程,给4个进程分配任务
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# 把所有任务都提交上去
futures = [executor.submit(echo, x) for x in range(20)]
with tqdm(total=len(futures)) as t_bar:
# 如果任务执行完毕,更新一下进度条
for future in concurrent.futures.as_completed(futures):
result, success = future.result()
t_bar.update()
程序伪代码:
如果你想直接用这个代码,复制就可以了。使用方法python 程序名.py 视频目录 输出目录
,比如python main.py videos/ rgbs/
#!/usr/bin/env python3
import argparse
import os
import skvideo.io
import concurrent.futures
import subprocess
import glob
from tqdm import tqdm
def video_to_images(video, targetdir, short_side=256):
# 将video视频解析成短边为256的视频帧,并保存到targetdir中
filename = video
output_foldername = os.path.join(targetdir, os.path.basename(video).split(".")[0])
if not os.path.exists(filename):
print(f"{filename} is not existed.")
return video, False
else:
# 查看视频的meta信息
try:
video_meta = skvideo.io.ffprobe(filename)
height = int(video_meta['video']['@height'])
width = int(video_meta['video']['@width'])
except Exception as e:
print(f"Can not get video info: {filename}, error {e}")
return video, False
# 进行缩放视频帧
if width > height:
scale = "scale=-1:{}".format(short_side)
else:
scale = "scale={}:-1".format(short_side)
if not os.path.exists(output_foldername):
os.makedirs(output_foldername)
# 使用ffmpeg解析视频
command = ['ffmpeg',
'-i', '"%s"' % filename,
'-vf', scale,
'-threads', '1',
'-loglevel', 'panic',
'-q:v', '2',
'{}/'.format(output_foldername) + '"%05d.jpg"']
command = ' '.join(command)
try:
subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
except Exception as e:
print(f"fail to convert {filename}, error: {e}")
return video, False
return video, True
if __name__ == '__main__':
# 参数解析
parser = argparse.ArgumentParser()
parser.add_argument('videos_dir', help='Input directory of videos with audio')
parser.add_argument('output_dir', help='Output directory to store JPEG files')
parser.add_argument('--num_workers', help='Number of workers', default=8, type=int)
args = parser.parse_args()
video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
# 开启args.num_workers个进程执行video_to_images函数
with concurrent.futures.ProcessPoolExecutor(max_workers=args.num_workers) as executor:
futures = [executor.submit(video_to_images, video, args.output_dir, 256) for video in video_list]
with tqdm(total=len(futures)) as t_bar:
for future in concurrent.futures.as_completed(futures):
video_id, success = future.result()
if not success:
print(f"Something wrong for {video_id}")
t_bar.update()
print("Completed")
import argparse
import subprocess
import os
import glob
from tqdm import tqdm
def ffmpeg_extraction(input_video, output_sound, sample_rate):
# 使用ffmpeg解析视频的音频
ffmpeg_command = ['ffmpeg', '-i', input_video,
'-vn', '-acodec', 'pcm_s16le',
'-loglevel', 'panic',
'-ac', '1', '-ar', sample_rate,
output_sound]
subprocess.call(ffmpeg_command)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('videos_dir', help='Input directory of videos with audio')
parser.add_argument('output_dir', help='Output directory to store .wav files')
parser.add_argument('--sample_rate', default='24000', help='Rate to resample audio')
parser.add_argument('--ext', default=['.mp4'], nargs='+', help='The extension of videos')
args = parser.parse_args()
video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
if not os.path.exists(args.output_dir):
os.mkdir(args.output_dir)
with tqdm(total=len(video_list)) as t_bar:
for video in video_list:
ffmpeg_extraction(video, os.path.join(args.output_dir, os.path.basename(video).split(".")[0] + ".wav"),
args.sample_rate)
t_bar.update()
光流采用Gunnar Farneback的算法
计算全局光流
#!/usr/bin/env python3
import argparse
import os
import skvideo.io
import concurrent.futures
import subprocess
import glob
from tqdm import tqdm
import cv2
import numpy as np
def optical_flow(file_path, targetdir, short_side):
cap = cv2.VideoCapture(file_path)
ret,frame1 = cap.read()
prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
prvs = cv2.resize(prvs, (256,256))
hsv = np.zeros_like(frame1)
hsv[...,1] = 255
frame_count = 1
while (1):
ret1,frame2 = cap.read()
if ret1:
next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
next_ = cv2.resize(next,(256,256))
flow = cv2.calcOpticalFlowFarneback(prvs,next_,None,0.5,3,15,3,5,1.2,0)
flow[...,0] = cv2.normalize(flow[...,0],None,0,255,cv2.NORM_MINMAX)
flow[...,1] = cv2.normalize(flow[...,1],None,0,255,cv2.NORM_MINMAX)
cv2.imwrite(f'{targetdir}/x_{frame_count:05d}.jpg',flow[...,0])
cv2.imwrite(f'{targetdir}/y_{frame_count:05d}.jpg',flow[...,1])
prvs = next_
frame_count = frame_count+1
else:
break
cap.release()
def video_to_flow(video, targetdir, short_side=256):
filename = video
output_foldername = os.path.join(targetdir, os.path.basename(video).split(".")[0])
if not os.path.exists(filename):
print(f"{filename} is not existed.")
return video, False
else:
try:
video_meta = skvideo.io.ffprobe(filename)
height = int(video_meta['video']['@height'])
width = int(video_meta['video']['@width'])
except Exception as e:
print(f"Can not get video info: {filename}, error {e}")
return video, False
if not os.path.exists(output_foldername):
os.makedirs(output_foldername)
optical_flow(video, output_foldername,short_side)
return video, True
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('videos_dir', help='Input directory of videos with audio')
parser.add_argument('output_dir', help='Output directory to store JPEG files')
parser.add_argument('--num_workers', help='Number of workers', default=8, type=int)
args = parser.parse_args()
video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
with concurrent.futures.ProcessPoolExecutor(max_workers=args.num_workers) as executor:
futures = [executor.submit(video_to_flow, video, args.output_dir, 256)
for video in video_list]
with tqdm(total=len(futures)) as t_bar:
for future in concurrent.futures.as_completed(futures):
video_id, success = future.result()
if not success:
print(f"Something wrong for {video_id}")
t_bar.update()
print("Completed")