Skip to content

Commit

Permalink
Merge pull request #32 from freemocap/openpose
Browse files Browse the repository at this point in the history
Openpose
  • Loading branch information
aaroncherian authored Apr 23, 2024
2 parents a02ab6c + 0929547 commit 5fce29d
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 8 deletions.
48 changes: 44 additions & 4 deletions skellytracker/process_folder_of_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
from skellytracker.trackers.mediapipe_tracker.mediapipe_model_info import (
MediapipeTrackingParams,
)


from skellytracker.trackers.openpose_tracker.openpose_tracker import OpenPoseTracker
from skellytracker.trackers.openpose_tracker.openpose_model_info import OpenPoseTrackingParams

from skellytracker.utilities.get_video_paths import get_video_paths

logger = logging.getLogger(__name__)

# TODO: figure out how we want to handle prefixes or suffixes here.
file_name_dictionary = {

"MediapipeHolisticTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"YOLOMediapipeComboTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"YOLOPoseTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"BrightestPointTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",

}


Expand Down Expand Up @@ -170,22 +177,55 @@ def get_tracker(tracker_name: str, tracking_params: BaseModel) -> BaseTracker:
elif tracker_name == "BrightestPointTracker":
tracker = BrightestPointTracker()

elif tracker_name == 'OpenPoseTracker':
tracker = OpenPoseTracker(
openpose_exe_path=tracking_params.openpose_exe_path,
output_json_path=tracking_params.output_json_path,
net_resolution=tracking_params.net_resolution,
number_people_max=tracking_params.number_people_max,
)

else:
raise ValueError("Invalid tracker type")

return tracker


if __name__ == "__main__":

synchronized_video_path = Path(
"/Users/philipqueen/freemocap_data/recording_sessions/freemocap_sample_data/synchronized_videos"
)
tracker_name = "YOLOMediapipeComboTracker"
num_processes = None
# tracker_name = "YOLOMediapipeComboTracker"
# num_processes = None

# process_folder_of_videos(
# tracker_name=tracker_name,
# tracking_params=MediapipeTrackingParams(),
# synchronized_video_path=synchronized_video_path,
# num_processes=num_processes,
# )

tracker_name = "OpenPoseTracker"
num_processes = 1

input_video_folder = Path(r'C:\Users\aaron\FreeMocap_Data\recording_sessions\freemocap_sample_data')
input_video_filepath = input_video_folder/'synchronized_videos'

output_video_folder = input_video_folder/'openpose_annotated_videos'
output_video_folder.mkdir(parents=True, exist_ok=True)

output_json_path = input_video_folder/'output_data'/'raw_data'/'openpose_jsons'
output_json_path.mkdir(parents=True, exist_ok=True)

openpose_exe_path = r'C:\openpose'

process_folder_of_videos(
tracker_name=tracker_name,
tracking_params=MediapipeTrackingParams(),
synchronized_video_path=synchronized_video_path,
tracking_params=OpenPoseTrackingParams(
openpose_exe_path=str(openpose_exe_path),
output_json_path=str(output_json_path),
),
synchronized_video_path=input_video_filepath,
num_processes=num_processes,
)
25 changes: 21 additions & 4 deletions skellytracker/trackers/base_tracker/base_recorder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import logging
from typing import Dict
from pathlib import Path
from typing import Dict, Union

import numpy as np

Expand All @@ -20,13 +21,13 @@ def __init__(self):

@abstractmethod
def record(
self, tracked_objects: Dict[str, TrackedObject], annotated_image: np.ndarray
self,
tracked_objects: Dict[str, TrackedObject],
) -> None:
"""
Record the tracked objects as they are created by the tracker.
:param tracked_object: A tracked objects dictionary.
:param annotated_image: Image array with tracking results annotated.
:return: None
"""
pass
Expand All @@ -45,7 +46,7 @@ def clear_recorded_objects(self):
self.recorded_objects = []
self.recorded_objects_array = None

def save(self, file_path: str) -> None:
def save(self, file_path: Union[str, Path]) -> None:
"""
Save the recorded objects to a file.
Expand All @@ -56,3 +57,19 @@ def save(self, file_path: str) -> None:
self.process_tracked_objects()
logger.info(f"Saving recorded objects to {file_path}")
np.save(file_path, self.recorded_objects_array)


class BaseCumulativeRecorder(BaseRecorder):
"""
A base class for recording data from cumulative trackers.
Throws a descriptive error for methods that do not apply to recording data from this type of tracker.
Trackers implementing this will only use the process_tracked_objects method to get data in the proper format.
"""

def __init__(self):
super().__init__()

def record(self, tracked_objects: Dict[str, TrackedObject]) -> None:
raise NotImplementedError(
"This tracker does not support by frame recording, please use process_tracked_objects instead"
)
53 changes: 53 additions & 0 deletions skellytracker/trackers/base_tracker/base_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,56 @@ def image_demo(self, image_path: Path) -> None:

image_viewer = ImageDemoViewer(self, self.__class__.__name__)
image_viewer.run(image_path=image_path)


class BaseCumulativeTracker(BaseTracker):
"""
A base class for tracking algorithms that run cumulatively, i.e are not able to process videos frame by frame.
Throws a descriptive error for the abstract methods of BaseTracker that do not apply to this type of tracker.
Trackers inheriting from this will need to overwrite the `process_video` method.
"""

def __init__(
self,
tracked_object_names: List[str] = None,
recorder: BaseRecorder = None,
**data: Any,
):
super().__init__(
tracked_object_names=tracked_object_names, recorder=recorder, **data
)

def process_image(self, **kwargs) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)

def annotate_image(self, **kwargs) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)

@abstractmethod
def process_video(
self,
input_video_filepath: Union[str, Path],
output_video_filepath: Optional[Union[str, Path]] = None,
save_data_bool: bool = False,
use_tqdm: bool = True,
**kwargs,
) -> Union[np.ndarray, None]:
"""
Run the tracker on a video.
:param input_video_filepath: Path to video file.
:param output_video_filepath: Path to save annotated video to, does not save video if None.
:param save_data_bool: Whether to save the data to a file.
:param use_tqdm: Whether to use tqdm to show a progress bar
:return: Array of tracked keypoint data
"""
pass

def image_demo(self, image_path: Path) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)
10 changes: 10 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_model_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from skellytracker.trackers.base_tracker.base_tracking_params import BaseTrackingParams


class OpenPoseTrackingParams(BaseTrackingParams):
openpose_exe_path: str
output_json_path: str
net_resolution: str = "-1x320"
number_people_max: int = 1
write_video: bool = True
openpose_output_resolution: str = "-1x-1"
68 changes: 68 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import numpy as np
from pathlib import Path
from tqdm import tqdm
import re

def extract_frame_index(filename):
"""Extract the numeric part indicating the frame index from the filename."""
match = re.search(r"_(\d{12})_keypoints", filename)
return int(match.group(1)) if match else None

def parse_openpose_jsons(main_directory):
"""Parse OpenPose JSON files from subdirectories within the main directory."""
main_directory = Path(main_directory)
subdirectories = [d for d in main_directory.iterdir() if d.is_dir()]
num_cams = len(subdirectories)

# Check the first subdirectory to determine the number of frames
sample_files = list(subdirectories[0].glob("*.json"))
num_frames = len(sample_files)
frame_indices = [extract_frame_index(f.name) for f in sample_files]
frame_indices.sort()

# Assuming standard OpenPose output
body_markers = 25
hand_markers = 21 # Per hand
face_markers = 70
num_markers = body_markers + 2 * hand_markers + face_markers

data_array = np.full((num_cams, num_frames, num_markers, 3), np.nan)

for cam_index, subdir in enumerate(subdirectories):
json_files = sorted(subdir.glob("*.json"), key=lambda x: extract_frame_index(x.stem))

for file_index, json_file in tqdm(enumerate(json_files), desc = f'Processing {subdir.name} JSONS'):
with open(json_file) as f:
data = json.load(f)

if data["people"]:
keypoints = extract_keypoints(data["people"][0], body_markers, hand_markers, face_markers)
data_array[cam_index, frame_indices[file_index], :, :] = keypoints

return data_array

def extract_keypoints(person_data, body_markers, hand_markers, face_markers):
"""Extract and organize keypoints from person data."""
# Initialize a full array of NaNs for keypoints
keypoints_array = np.full((body_markers + 2 * hand_markers + face_markers, 3), np.nan)

# Populate the array with available data
if "pose_keypoints_2d" in person_data:
keypoints_array[:body_markers, :] = np.reshape(person_data["pose_keypoints_2d"], (-1, 3))[:body_markers, :]
if "hand_left_keypoints_2d" in person_data and "hand_right_keypoints_2d" in person_data:
keypoints_array[body_markers:body_markers + hand_markers, :] = np.reshape(person_data["hand_left_keypoints_2d"], (-1, 3))[:hand_markers, :]
keypoints_array[body_markers + hand_markers:body_markers + 2*hand_markers, :] = np.reshape(person_data["hand_right_keypoints_2d"], (-1, 3))[:hand_markers, :]
if "face_keypoints_2d" in person_data:
keypoints_array[body_markers + 2*hand_markers:, :] = np.reshape(person_data["face_keypoints_2d"], (-1, 3))[:face_markers, :]

return keypoints_array


path_to_recording_folder = Path(r'D:\steen_pantsOn_gait_3_cameras')
path_to_json_folder = path_to_recording_folder/'output_data'/'raw_data'/'openpose_json'
path_to_save_raw_data = path_to_recording_folder/'output_data'/'raw_data'/'openpose2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy'

data = parse_openpose_jsons(path_to_json_folder)
np.save(path_to_save_raw_data, data)
f = 2
101 changes: 101 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import json
from typing import Union
import numpy as np
from pathlib import Path
from skellytracker.trackers.base_tracker.base_recorder import BaseCumulativeRecorder
import re
from tqdm import tqdm


class OpenPoseRecorder(BaseCumulativeRecorder):
def __init__(self, json_directory_path: Union[Path, str]):
super().__init__()
self.json_directory_path = Path(json_directory_path)

def extract_frame_index(self, filename: str) -> Union[int, None]:
"""Extract the numeric part indicating the frame index from the filename."""
match = re.search(r"_(\d{12})_keypoints", filename)
return int(match.group(1)) if match else None

def parse_openpose_jsons(self, main_directory: Union[Path, str]) -> np.ndarray:
subdirectories = [d for d in Path(main_directory).iterdir() if d.is_dir()]
num_cams = len(subdirectories)

# Assuming the first subdirectory to determine the number of frames
sample_files = list(subdirectories[0].glob("*.json"))
num_frames = len(sample_files)
frame_indices = [self.extract_frame_index(f.name) for f in sample_files]
frame_indices.sort()

# Assuming standard OpenPose output
# TODO: move these definitions to openpose model info
body_markers, hand_markers, face_markers = 25, 21, 70
num_markers = body_markers + 2 * hand_markers + face_markers

data_array = np.full((num_cams, num_frames, num_markers, 3), np.nan)

for cam_index, subdir in enumerate(subdirectories):
json_files = sorted(
subdir.glob("*.json"), key=lambda x: self.extract_frame_index(x.stem)
)

for file_index, json_file in enumerate(
tqdm(json_files, desc=f"Processing {subdir.name} JSONs")
):
with open(json_file) as f:
data = json.load(f)

if data["people"]:
keypoints = self.extract_keypoints(
data["people"][0], body_markers, hand_markers, face_markers
)
data_array[cam_index, frame_indices[file_index], :, :] = keypoints

return data_array

def extract_keypoints(
self, person_data, body_markers, hand_markers, face_markers
) -> (
np.ndarray
): # TODO: type hint person_data - is this an ndarray yet or something else?
# TODO: marker numbers don't need to be passed into this function, they can just be referred to as OpenPoseModelInfo.xyz
"""Extract and organize keypoints from person data."""
# Initialize a full array of NaNs for keypoints
keypoints_array = np.full(
(body_markers + 2 * hand_markers + face_markers, 3), np.nan
)

# Populate the array with available data
if "pose_keypoints_2d" in person_data:
keypoints_array[:body_markers, :] = np.reshape(
person_data["pose_keypoints_2d"], (-1, 3)
)[:body_markers, :]
if (
"hand_left_keypoints_2d" in person_data
and "hand_right_keypoints_2d" in person_data
):
keypoints_array[body_markers : body_markers + hand_markers, :] = np.reshape(
person_data["hand_left_keypoints_2d"], (-1, 3)
)[:hand_markers, :]
keypoints_array[
body_markers + hand_markers : body_markers + 2 * hand_markers, :
] = np.reshape(person_data["hand_right_keypoints_2d"], (-1, 3))[
:hand_markers, :
]
if "face_keypoints_2d" in person_data:
keypoints_array[body_markers + 2 * hand_markers :, :] = np.reshape(
person_data["face_keypoints_2d"], (-1, 3)
)[:face_markers, :]

return keypoints_array

def process_tracked_objects(self, **kwargs) -> np.ndarray:
"""
Convert the recorded JSON data into the structured numpy array format.
"""
# In this case, the recorded_objects are already in the desired format,
# so we simply return them.
self.recorded_objects_array = self.parse_openpose_jsons(
self.json_directory_path
)
return self.recorded_objects_array
Loading

0 comments on commit 5fce29d

Please sign in to comment.