# Copyright (c) OpenMMLab. All rights reserved. import argparse import os import pickle import shutil from os.path import join import cv2 import h5py import mmcv import numpy as np from scipy.io import loadmat train_subjects = [i for i in range(1, 9)] test_subjects = [i for i in range(1, 7)] train_seqs = [1, 2] train_cams = [0, 1, 2, 4, 5, 6, 7, 8] train_frame_nums = { (1, 1): 6416, (1, 2): 12430, (2, 1): 6502, (2, 2): 6081, (3, 1): 12488, (3, 2): 12283, (4, 1): 6171, (4, 2): 6675, (5, 1): 12820, (5, 2): 12312, (6, 1): 6188, (6, 2): 6145, (7, 1): 6239, (7, 2): 6320, (8, 1): 6468, (8, 2): 6054 } test_frame_nums = {1: 6151, 2: 6080, 3: 5838, 4: 6007, 5: 320, 6: 492} train_img_size = (2048, 2048) root_index = 14 joints_17 = [7, 5, 14, 15, 16, 9, 10, 11, 23, 24, 25, 18, 19, 20, 4, 3, 6] def get_pose_stats(kps): """Get statistic information `mean` and `std` of pose data. Args: kps (ndarray): keypoints in shape [..., K, D] where K and D is the keypoint category number and dimension. Returns: mean (ndarray): [K, D] """ assert kps.ndim > 2 K, D = kps.shape[-2:] kps = kps.reshape(-1, K, D) mean = kps.mean(axis=0) std = kps.std(axis=0) return mean, std def get_annotations(joints_2d, joints_3d, scale_factor=1.2): """Get annotations, including centers, scales, joints_2d and joints_3d. Args: joints_2d: 2D joint coordinates in shape [N, K, 2], where N is the frame number, K is the joint number. joints_3d: 3D joint coordinates in shape [N, K, 3], where N is the frame number, K is the joint number. scale_factor: Scale factor of bounding box. Default: 1.2. Returns: centers (ndarray): [N, 2] scales (ndarray): [N,] joints_2d (ndarray): [N, K, 3] joints_3d (ndarray): [N, K, 4] """ # calculate joint visibility visibility = (joints_2d[:, :, 0] >= 0) * \ (joints_2d[:, :, 0] < train_img_size[0]) * \ (joints_2d[:, :, 1] >= 0) * \ (joints_2d[:, :, 1] < train_img_size[1]) visibility = np.array(visibility, dtype=np.float32)[:, :, None] joints_2d = np.concatenate([joints_2d, visibility], axis=-1) joints_3d = np.concatenate([joints_3d, visibility], axis=-1) # calculate bounding boxes bboxes = np.stack([ np.min(joints_2d[:, :, 0], axis=1), np.min(joints_2d[:, :, 1], axis=1), np.max(joints_2d[:, :, 0], axis=1), np.max(joints_2d[:, :, 1], axis=1) ], axis=1) centers = np.stack([(bboxes[:, 0] + bboxes[:, 2]) / 2, (bboxes[:, 1] + bboxes[:, 3]) / 2], axis=1) scales = scale_factor * np.max(bboxes[:, 2:] - bboxes[:, :2], axis=1) / 200 return centers, scales, joints_2d, joints_3d def load_trainset(data_root, out_dir): """Load training data, create annotation file and camera file. Args: data_root: Directory of dataset, which is organized in the following hierarchy: data_root |-- train |-- S1 |-- Seq1 |-- Seq2 |-- S2 |-- ... |-- test |-- TS1 |-- TS2 |-- ... out_dir: Directory to save annotation file. """ _imgnames = [] _centers = [] _scales = [] _joints_2d = [] _joints_3d = [] cameras = {} img_dir = join(out_dir, 'images') os.makedirs(img_dir, exist_ok=True) annot_dir = join(out_dir, 'annotations') os.makedirs(annot_dir, exist_ok=True) for subj in train_subjects: for seq in train_seqs: seq_path = join(data_root, 'train', f'S{subj}', f'Seq{seq}') num_frames = train_frame_nums[(subj, seq)] # load camera parametres camera_file = join(seq_path, 'camera.calibration') with open(camera_file, 'r') as fin: lines = fin.readlines() for cam in train_cams: K = [float(s) for s in lines[cam * 7 + 5][11:-2].split()] f = np.array([[K[0]], [K[5]]]) c = np.array([[K[2]], [K[6]]]) RT = np.array( [float(s) for s in lines[cam * 7 + 6][11:-2].split()]) RT = np.reshape(RT, (4, 4)) R = RT[:3, :3] # convert unit from millimeter to meter T = RT[:3, 3:] * 0.001 size = [int(s) for s in lines[cam * 7 + 3][14:].split()] w, h = size cam_param = dict( R=R, T=T, c=c, f=f, w=w, h=h, name=f'train_cam_{cam}') cameras[f'S{subj}_Seq{seq}_Cam{cam}'] = cam_param # load annotations annot_file = os.path.join(seq_path, 'annot.mat') annot2 = loadmat(annot_file)['annot2'] annot3 = loadmat(annot_file)['annot3'] for cam in train_cams: # load 2D and 3D annotations joints_2d = np.reshape(annot2[cam][0][:num_frames], (num_frames, 28, 2))[:, joints_17] joints_3d = np.reshape(annot3[cam][0][:num_frames], (num_frames, 28, 3))[:, joints_17] joints_3d = joints_3d * 0.001 centers, scales, joints_2d, joints_3d = get_annotations( joints_2d, joints_3d) _centers.append(centers) _scales.append(scales) _joints_2d.append(joints_2d) _joints_3d.append(joints_3d) # extract frames from video video_path = join(seq_path, 'imageSequence', f'video_{cam}.avi') video = mmcv.VideoReader(video_path) for i in mmcv.track_iter_progress(range(num_frames)): img = video.read() if img is None: break imgname = f'S{subj}_Seq{seq}_Cam{cam}_{i+1:06d}.jpg' _imgnames.append(imgname) cv2.imwrite(join(img_dir, imgname), img) _imgnames = np.array(_imgnames) _centers = np.concatenate(_centers) _scales = np.concatenate(_scales) _joints_2d = np.concatenate(_joints_2d) _joints_3d = np.concatenate(_joints_3d) out_file = join(annot_dir, 'mpi_inf_3dhp_train.npz') np.savez( out_file, imgname=_imgnames, center=_centers, scale=_scales, part=_joints_2d, S=_joints_3d) print(f'Create annotation file for trainset: {out_file}. ' f'{len(_imgnames)} samples in total.') out_file = join(annot_dir, 'cameras_train.pkl') with open(out_file, 'wb') as fout: pickle.dump(cameras, fout) print(f'Create camera file for trainset: {out_file}.') # get `mean` and `std` of pose data _joints_3d = _joints_3d[..., :3] # remove visibility mean_3d, std_3d = get_pose_stats(_joints_3d) _joints_2d = _joints_2d[..., :2] # remove visibility mean_2d, std_2d = get_pose_stats(_joints_2d) # centered around root _joints_3d_rel = _joints_3d - _joints_3d[..., root_index:root_index + 1, :] mean_3d_rel, std_3d_rel = get_pose_stats(_joints_3d_rel) mean_3d_rel[root_index] = mean_3d[root_index] std_3d_rel[root_index] = std_3d[root_index] _joints_2d_rel = _joints_2d - _joints_2d[..., root_index:root_index + 1, :] mean_2d_rel, std_2d_rel = get_pose_stats(_joints_2d_rel) mean_2d_rel[root_index] = mean_2d[root_index] std_2d_rel[root_index] = std_2d[root_index] stats = { 'joint3d_stats': { 'mean': mean_3d, 'std': std_3d }, 'joint2d_stats': { 'mean': mean_2d, 'std': std_2d }, 'joint3d_rel_stats': { 'mean': mean_3d_rel, 'std': std_3d_rel }, 'joint2d_rel_stats': { 'mean': mean_2d_rel, 'std': std_2d_rel } } for name, stat_dict in stats.items(): out_file = join(annot_dir, f'{name}.pkl') with open(out_file, 'wb') as f: pickle.dump(stat_dict, f) print(f'Create statistic data file: {out_file}') def load_testset(data_root, out_dir, valid_only=True): """Load testing data, create annotation file and camera file. Args: data_root: Directory of dataset. out_dir: Directory to save annotation file. valid_only: Only keep frames with valid_label == 1. """ _imgnames = [] _centers = [] _scales = [] _joints_2d = [] _joints_3d = [] cameras = {} img_dir = join(out_dir, 'images') os.makedirs(img_dir, exist_ok=True) annot_dir = join(out_dir, 'annotations') os.makedirs(annot_dir, exist_ok=True) for subj in test_subjects: subj_path = join(data_root, 'test', f'TS{subj}') num_frames = test_frame_nums[subj] # load annotations annot_file = os.path.join(subj_path, 'annot_data.mat') with h5py.File(annot_file, 'r') as fin: annot2 = np.array(fin['annot2']).reshape((-1, 17, 2)) annot3 = np.array(fin['annot3']).reshape((-1, 17, 3)) valid = np.array(fin['valid_frame']).reshape(-1) # manually estimate camera intrinsics fx, cx = np.linalg.lstsq( annot3[:, :, [0, 2]].reshape((-1, 2)), (annot2[:, :, 0] * annot3[:, :, 2]).reshape(-1, 1), rcond=None)[0].flatten() fy, cy = np.linalg.lstsq( annot3[:, :, [1, 2]].reshape((-1, 2)), (annot2[:, :, 1] * annot3[:, :, 2]).reshape(-1, 1), rcond=None)[0].flatten() if subj <= 4: w, h = 2048, 2048 else: w, h = 1920, 1080 cameras[f'TS{subj}'] = dict( c=np.array([[cx], [cy]]), f=np.array([[fx], [fy]]), w=w, h=h, name=f'test_cam_{subj}') # get annotations if valid_only: valid_frames = np.nonzero(valid)[0] else: valid_frames = np.arange(num_frames) joints_2d = annot2[valid_frames, :, :] joints_3d = annot3[valid_frames, :, :] * 0.001 centers, scales, joints_2d, joints_3d = get_annotations( joints_2d, joints_3d) _centers.append(centers) _scales.append(scales) _joints_2d.append(joints_2d) _joints_3d.append(joints_3d) # copy and rename images for i in valid_frames: imgname = f'TS{subj}_{i+1:06d}.jpg' shutil.copyfile( join(subj_path, 'imageSequence', f'img_{i+1:06d}.jpg'), join(img_dir, imgname)) _imgnames.append(imgname) _imgnames = np.array(_imgnames) _centers = np.concatenate(_centers) _scales = np.concatenate(_scales) _joints_2d = np.concatenate(_joints_2d) _joints_3d = np.concatenate(_joints_3d) if valid_only: out_file = join(annot_dir, 'mpi_inf_3dhp_test_valid.npz') else: out_file = join(annot_dir, 'mpi_inf_3dhp_test_all.npz') np.savez( out_file, imgname=_imgnames, center=_centers, scale=_scales, part=_joints_2d, S=_joints_3d) print(f'Create annotation file for testset: {out_file}. ' f'{len(_imgnames)} samples in total.') out_file = join(annot_dir, 'cameras_test.pkl') with open(out_file, 'wb') as fout: pickle.dump(cameras, fout) print(f'Create camera file for testset: {out_file}.') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('data_root', type=str, help='data root') parser.add_argument( 'out_dir', type=str, help='directory to save annotation files.') args = parser.parse_args() data_root = args.data_root out_dir = args.out_dir load_trainset(data_root, out_dir) load_testset(data_root, out_dir, valid_only=True)