做视频领域时,常常有两种提取视频模态的方法:

本文主要讲解离线提取的方式,因为离线提取对存储空间要求较高,但是具有一次提取多次复用的优点。

多进程提取视频模态特征 - 飞桨AI Studio

项目依赖

conda install numpy==1.21.2 sk_video==1.1.10 tqdm==4.62.3 scikit-video
conda install opencv_python_headless==4.5.5.62 // 计算光流需要

// 还需要一个,ffmpeg的可执行文件,如果不知道怎么安装可以使用conda来安装
conda config --add channels conda-forge
conda install ffmpeg

多进程-简单示例

注:Python因为有GIL锁,使得多线程效率降低,但是多进程不受影响

import concurrent.futures
import subprocess
import time
from tqdm import tqdm

def echo(x):
		# 计算x的平方
    print(x)
    time.sleep(1)
    return x*x, True

# 创建一个执行器,管理4个进程,给4个进程分配任务
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
		# 把所有任务都提交上去
    futures = [executor.submit(echo, x) for x in range(20)]
    with tqdm(total=len(futures)) as t_bar:
				# 如果任务执行完毕,更新一下进度条
        for future in concurrent.futures.as_completed(futures):
            result, success = future.result()
            t_bar.update()

解析RGB模态

程序伪代码:

如果你想直接用这个代码,复制就可以了。使用方法python 程序名.py 视频目录 输出目录 ,比如python main.py videos/ rgbs/

#!/usr/bin/env python3

import argparse
import os
import skvideo.io
import concurrent.futures
import subprocess
import glob
from tqdm import tqdm

def video_to_images(video, targetdir, short_side=256):
    # 将video视频解析成短边为256的视频帧,并保存到targetdir中
    filename = video
    output_foldername = os.path.join(targetdir, os.path.basename(video).split(".")[0])
    if not os.path.exists(filename):
        print(f"{filename} is not existed.")
        return video, False
    else:
        # 查看视频的meta信息
        try:
            video_meta = skvideo.io.ffprobe(filename)
            height = int(video_meta['video']['@height'])
            width = int(video_meta['video']['@width'])
        except Exception as e:
            print(f"Can not get video info: {filename}, error {e}")
            return video, False

        # 进行缩放视频帧
        if width > height:
            scale = "scale=-1:{}".format(short_side)
        else:
            scale = "scale={}:-1".format(short_side)
        if not os.path.exists(output_foldername):
            os.makedirs(output_foldername)

        # 使用ffmpeg解析视频
        command = ['ffmpeg',
                   '-i', '"%s"' % filename,
                   '-vf', scale,
                   '-threads', '1',
                   '-loglevel', 'panic',
                   '-q:v', '2',
                   '{}/'.format(output_foldername) + '"%05d.jpg"']
        command = ' '.join(command)
        try:
            subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
        except Exception as e:
            print(f"fail to convert {filename}, error: {e}")
            return video, False
        return video, True

if __name__ == '__main__':
		# 参数解析
    parser = argparse.ArgumentParser()
    parser.add_argument('videos_dir', help='Input directory of videos with audio')
    parser.add_argument('output_dir', help='Output directory to store JPEG files')
    parser.add_argument('--num_workers', help='Number of workers', default=8, type=int)
    args = parser.parse_args()

    video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
    # 开启args.num_workers个进程执行video_to_images函数
    with concurrent.futures.ProcessPoolExecutor(max_workers=args.num_workers) as executor:
        futures = [executor.submit(video_to_images, video, args.output_dir, 256) for video in video_list]
        with tqdm(total=len(futures)) as t_bar:
            for future in concurrent.futures.as_completed(futures):
                video_id, success = future.result()
                if not success:
                    print(f"Something wrong for {video_id}")
                t_bar.update()
    print("Completed")

解析声音模态

import argparse
import subprocess
import os
import glob
from tqdm import tqdm

def ffmpeg_extraction(input_video, output_sound, sample_rate):
    # 使用ffmpeg解析视频的音频
    ffmpeg_command = ['ffmpeg', '-i', input_video,
                      '-vn', '-acodec', 'pcm_s16le',
                      '-loglevel', 'panic',
                      '-ac', '1', '-ar', sample_rate,
                      output_sound]
    subprocess.call(ffmpeg_command)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('videos_dir', help='Input directory of videos with audio')
    parser.add_argument('output_dir', help='Output directory to store .wav files')
    parser.add_argument('--sample_rate', default='24000', help='Rate to resample audio')
    parser.add_argument('--ext', default=['.mp4'], nargs='+', help='The extension of videos')

    args = parser.parse_args()
    video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
    if not os.path.exists(args.output_dir):
        os.mkdir(args.output_dir)

    with tqdm(total=len(video_list)) as t_bar:
        for video in video_list:
            ffmpeg_extraction(video, os.path.join(args.output_dir, os.path.basename(video).split(".")[0] + ".wav"),
                    args.sample_rate)
            t_bar.update()

解析光流模态

光流采用Gunnar Farneback的算法计算全局光流

#!/usr/bin/env python3

import argparse
import os
import skvideo.io
import concurrent.futures
import subprocess
import glob
from tqdm import tqdm
import cv2
import numpy as np

def optical_flow(file_path, targetdir, short_side):
    cap = cv2.VideoCapture(file_path)
    ret,frame1 = cap.read()

    prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
    prvs = cv2.resize(prvs, (256,256))

    hsv = np.zeros_like(frame1)
    hsv[...,1] = 255
    frame_count = 1
    while (1):
        ret1,frame2 = cap.read()
        if ret1:
            next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
            next_ = cv2.resize(next,(256,256))

            flow = cv2.calcOpticalFlowFarneback(prvs,next_,None,0.5,3,15,3,5,1.2,0)
            flow[...,0] = cv2.normalize(flow[...,0],None,0,255,cv2.NORM_MINMAX)
            flow[...,1] = cv2.normalize(flow[...,1],None,0,255,cv2.NORM_MINMAX)

            cv2.imwrite(f'{targetdir}/x_{frame_count:05d}.jpg',flow[...,0])
            cv2.imwrite(f'{targetdir}/y_{frame_count:05d}.jpg',flow[...,1])
            prvs = next_
            frame_count = frame_count+1
        else:
            break
    cap.release()

def video_to_flow(video, targetdir, short_side=256):
    filename = video
    output_foldername = os.path.join(targetdir, os.path.basename(video).split(".")[0])

    if not os.path.exists(filename):
        print(f"{filename} is not existed.")
        return video, False
    else:
        try:
            video_meta = skvideo.io.ffprobe(filename)
            height = int(video_meta['video']['@height'])
            width = int(video_meta['video']['@width'])
        except Exception as e:
            print(f"Can not get video info: {filename}, error {e}")
            return video, False

        if not os.path.exists(output_foldername):
            os.makedirs(output_foldername)

        optical_flow(video, output_foldername,short_side)
        return video, True

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('videos_dir', help='Input directory of videos with audio')
    parser.add_argument('output_dir', help='Output directory to store JPEG files')
    parser.add_argument('--num_workers', help='Number of workers', default=8, type=int)
    args = parser.parse_args()

    video_list = glob.glob(args.videos_dir + '/**/*.*', recursive=True)
    with concurrent.futures.ProcessPoolExecutor(max_workers=args.num_workers) as executor:
        futures = [executor.submit(video_to_flow, video, args.output_dir, 256)
                   for video in video_list]
        with tqdm(total=len(futures)) as t_bar:
            for future in concurrent.futures.as_completed(futures):
                video_id, success = future.result()
                if not success:
                    print(f"Something wrong for {video_id}")
                t_bar.update()
    print("Completed")