Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Openpose #32

Merged
merged 12 commits into from
Apr 23, 2024
48 changes: 44 additions & 4 deletions skellytracker/process_folder_of_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
from skellytracker.trackers.mediapipe_tracker.mediapipe_model_info import (
MediapipeTrackingParams,
)


from skellytracker.trackers.openpose_tracker.openpose_tracker import OpenPoseTracker
from skellytracker.trackers.openpose_tracker.openpose_model_info import OpenPoseTrackingParams

from skellytracker.utilities.get_video_paths import get_video_paths

logger = logging.getLogger(__name__)

# TODO: figure out how we want to handle prefixes or suffixes here.
file_name_dictionary = {

"MediapipeHolisticTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"YOLOMediapipeComboTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"YOLOPoseTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",
"BrightestPointTracker": "2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy",

}


Expand Down Expand Up @@ -170,22 +177,55 @@ def get_tracker(tracker_name: str, tracking_params: BaseModel) -> BaseTracker:
elif tracker_name == "BrightestPointTracker":
tracker = BrightestPointTracker()

elif tracker_name == 'OpenPoseTracker':
tracker = OpenPoseTracker(
openpose_exe_path=tracking_params.openpose_exe_path,
output_json_path=tracking_params.output_json_path,
net_resolution=tracking_params.net_resolution,
number_people_max=tracking_params.number_people_max,
)

else:
raise ValueError("Invalid tracker type")

return tracker


if __name__ == "__main__":

synchronized_video_path = Path(
"/Users/philipqueen/freemocap_data/recording_sessions/freemocap_sample_data/synchronized_videos"
)
tracker_name = "YOLOMediapipeComboTracker"
num_processes = None
# tracker_name = "YOLOMediapipeComboTracker"
# num_processes = None

# process_folder_of_videos(
# tracker_name=tracker_name,
# tracking_params=MediapipeTrackingParams(),
# synchronized_video_path=synchronized_video_path,
# num_processes=num_processes,
# )

tracker_name = "OpenPoseTracker"
num_processes = 1

input_video_folder = Path(r'C:\Users\aaron\FreeMocap_Data\recording_sessions\freemocap_sample_data')
input_video_filepath = input_video_folder/'synchronized_videos'

output_video_folder = input_video_folder/'openpose_annotated_videos'
output_video_folder.mkdir(parents=True, exist_ok=True)

output_json_path = input_video_folder/'output_data'/'raw_data'/'openpose_jsons'
output_json_path.mkdir(parents=True, exist_ok=True)

openpose_exe_path = r'C:\openpose'

process_folder_of_videos(
tracker_name=tracker_name,
tracking_params=MediapipeTrackingParams(),
synchronized_video_path=synchronized_video_path,
tracking_params=OpenPoseTrackingParams(
openpose_exe_path=str(openpose_exe_path),
output_json_path=str(output_json_path),
),
synchronized_video_path=input_video_filepath,
num_processes=num_processes,
)
25 changes: 21 additions & 4 deletions skellytracker/trackers/base_tracker/base_recorder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import logging
from typing import Dict
from pathlib import Path
from typing import Dict, Union

import numpy as np

Expand All @@ -20,13 +21,13 @@ def __init__(self):

@abstractmethod
def record(
self, tracked_objects: Dict[str, TrackedObject], annotated_image: np.ndarray
self,
tracked_objects: Dict[str, TrackedObject],
) -> None:
"""
Record the tracked objects as they are created by the tracker.

:param tracked_object: A tracked objects dictionary.
:param annotated_image: Image array with tracking results annotated.
:return: None
"""
pass
Expand All @@ -45,7 +46,7 @@ def clear_recorded_objects(self):
self.recorded_objects = []
self.recorded_objects_array = None

def save(self, file_path: str) -> None:
def save(self, file_path: Union[str, Path]) -> None:
"""
Save the recorded objects to a file.

Expand All @@ -56,3 +57,19 @@ def save(self, file_path: str) -> None:
self.process_tracked_objects()
logger.info(f"Saving recorded objects to {file_path}")
np.save(file_path, self.recorded_objects_array)


class BaseCumulativeRecorder(BaseRecorder):
"""
A base class for recording data from cumulative trackers.
Throws a descriptive error for methods that do not apply to recording data from this type of tracker.
Trackers implementing this will only use the process_tracked_objects method to get data in the proper format.
"""

def __init__(self):
super().__init__()

def record(self, tracked_objects: Dict[str, TrackedObject]) -> None:
raise NotImplementedError(
"This tracker does not support by frame recording, please use process_tracked_objects instead"
)
53 changes: 53 additions & 0 deletions skellytracker/trackers/base_tracker/base_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,56 @@ def image_demo(self, image_path: Path) -> None:

image_viewer = ImageDemoViewer(self, self.__class__.__name__)
image_viewer.run(image_path=image_path)


class BaseCumulativeTracker(BaseTracker):
"""
A base class for tracking algorithms that run cumulatively, i.e are not able to process videos frame by frame.
Throws a descriptive error for the abstract methods of BaseTracker that do not apply to this type of tracker.
Trackers inheriting from this will need to overwrite the `process_video` method.
"""

def __init__(
self,
tracked_object_names: List[str] = None,
recorder: BaseRecorder = None,
**data: Any,
):
super().__init__(
tracked_object_names=tracked_object_names, recorder=recorder, **data
)

def process_image(self, **kwargs) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)

def annotate_image(self, **kwargs) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)

@abstractmethod
def process_video(
self,
input_video_filepath: Union[str, Path],
output_video_filepath: Optional[Union[str, Path]] = None,
save_data_bool: bool = False,
use_tqdm: bool = True,
**kwargs,
) -> Union[np.ndarray, None]:
"""
Run the tracker on a video.

:param input_video_filepath: Path to video file.
:param output_video_filepath: Path to save annotated video to, does not save video if None.
:param save_data_bool: Whether to save the data to a file.
:param use_tqdm: Whether to use tqdm to show a progress bar
:return: Array of tracked keypoint data
"""
pass

def image_demo(self, image_path: Path) -> None:
raise NotImplementedError(
"This tracker does not support processing individual images, please use process_video instead."
)
10 changes: 10 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_model_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from skellytracker.trackers.base_tracker.base_tracking_params import BaseTrackingParams


class OpenPoseTrackingParams(BaseTrackingParams):
openpose_exe_path: str
output_json_path: str
net_resolution: str = "-1x320"
number_people_max: int = 1
write_video: bool = True
openpose_output_resolution: str = "-1x-1"
68 changes: 68 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import numpy as np
from pathlib import Path
from tqdm import tqdm
import re

def extract_frame_index(filename):
"""Extract the numeric part indicating the frame index from the filename."""
match = re.search(r"_(\d{12})_keypoints", filename)
return int(match.group(1)) if match else None

def parse_openpose_jsons(main_directory):
"""Parse OpenPose JSON files from subdirectories within the main directory."""
main_directory = Path(main_directory)
subdirectories = [d for d in main_directory.iterdir() if d.is_dir()]
num_cams = len(subdirectories)

# Check the first subdirectory to determine the number of frames
sample_files = list(subdirectories[0].glob("*.json"))
num_frames = len(sample_files)
frame_indices = [extract_frame_index(f.name) for f in sample_files]
frame_indices.sort()

# Assuming standard OpenPose output
body_markers = 25
hand_markers = 21 # Per hand
face_markers = 70
num_markers = body_markers + 2 * hand_markers + face_markers

data_array = np.full((num_cams, num_frames, num_markers, 3), np.nan)

for cam_index, subdir in enumerate(subdirectories):
json_files = sorted(subdir.glob("*.json"), key=lambda x: extract_frame_index(x.stem))

for file_index, json_file in tqdm(enumerate(json_files), desc = f'Processing {subdir.name} JSONS'):
with open(json_file) as f:
data = json.load(f)

if data["people"]:
keypoints = extract_keypoints(data["people"][0], body_markers, hand_markers, face_markers)
data_array[cam_index, frame_indices[file_index], :, :] = keypoints

return data_array

def extract_keypoints(person_data, body_markers, hand_markers, face_markers):
"""Extract and organize keypoints from person data."""
# Initialize a full array of NaNs for keypoints
keypoints_array = np.full((body_markers + 2 * hand_markers + face_markers, 3), np.nan)

# Populate the array with available data
if "pose_keypoints_2d" in person_data:
keypoints_array[:body_markers, :] = np.reshape(person_data["pose_keypoints_2d"], (-1, 3))[:body_markers, :]
if "hand_left_keypoints_2d" in person_data and "hand_right_keypoints_2d" in person_data:
keypoints_array[body_markers:body_markers + hand_markers, :] = np.reshape(person_data["hand_left_keypoints_2d"], (-1, 3))[:hand_markers, :]
keypoints_array[body_markers + hand_markers:body_markers + 2*hand_markers, :] = np.reshape(person_data["hand_right_keypoints_2d"], (-1, 3))[:hand_markers, :]
if "face_keypoints_2d" in person_data:
keypoints_array[body_markers + 2*hand_markers:, :] = np.reshape(person_data["face_keypoints_2d"], (-1, 3))[:face_markers, :]

return keypoints_array


path_to_recording_folder = Path(r'D:\steen_pantsOn_gait_3_cameras')
path_to_json_folder = path_to_recording_folder/'output_data'/'raw_data'/'openpose_json'
path_to_save_raw_data = path_to_recording_folder/'output_data'/'raw_data'/'openpose2dData_numCams_numFrames_numTrackedPoints_pixelXY.npy'

data = parse_openpose_jsons(path_to_json_folder)
np.save(path_to_save_raw_data, data)
f = 2
101 changes: 101 additions & 0 deletions skellytracker/trackers/openpose_tracker/openpose_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import json
from typing import Union
import numpy as np
from pathlib import Path
from skellytracker.trackers.base_tracker.base_recorder import BaseCumulativeRecorder
import re
from tqdm import tqdm


class OpenPoseRecorder(BaseCumulativeRecorder):
def __init__(self, json_directory_path: Union[Path, str]):
super().__init__()
self.json_directory_path = Path(json_directory_path)

def extract_frame_index(self, filename: str) -> Union[int, None]:
"""Extract the numeric part indicating the frame index from the filename."""
match = re.search(r"_(\d{12})_keypoints", filename)
return int(match.group(1)) if match else None

def parse_openpose_jsons(self, main_directory: Union[Path, str]) -> np.ndarray:
subdirectories = [d for d in Path(main_directory).iterdir() if d.is_dir()]
num_cams = len(subdirectories)

# Assuming the first subdirectory to determine the number of frames
sample_files = list(subdirectories[0].glob("*.json"))
num_frames = len(sample_files)
frame_indices = [self.extract_frame_index(f.name) for f in sample_files]
frame_indices.sort()

# Assuming standard OpenPose output
# TODO: move these definitions to openpose model info
body_markers, hand_markers, face_markers = 25, 21, 70
num_markers = body_markers + 2 * hand_markers + face_markers

data_array = np.full((num_cams, num_frames, num_markers, 3), np.nan)

for cam_index, subdir in enumerate(subdirectories):
json_files = sorted(
subdir.glob("*.json"), key=lambda x: self.extract_frame_index(x.stem)
)

for file_index, json_file in enumerate(
tqdm(json_files, desc=f"Processing {subdir.name} JSONs")
):
with open(json_file) as f:
data = json.load(f)

if data["people"]:
keypoints = self.extract_keypoints(
data["people"][0], body_markers, hand_markers, face_markers
)
data_array[cam_index, frame_indices[file_index], :, :] = keypoints

return data_array

def extract_keypoints(
self, person_data, body_markers, hand_markers, face_markers
) -> (
np.ndarray
): # TODO: type hint person_data - is this an ndarray yet or something else?
# TODO: marker numbers don't need to be passed into this function, they can just be referred to as OpenPoseModelInfo.xyz
"""Extract and organize keypoints from person data."""
# Initialize a full array of NaNs for keypoints
keypoints_array = np.full(
(body_markers + 2 * hand_markers + face_markers, 3), np.nan
)

# Populate the array with available data
if "pose_keypoints_2d" in person_data:
keypoints_array[:body_markers, :] = np.reshape(
person_data["pose_keypoints_2d"], (-1, 3)
)[:body_markers, :]
if (
"hand_left_keypoints_2d" in person_data
and "hand_right_keypoints_2d" in person_data
):
keypoints_array[body_markers : body_markers + hand_markers, :] = np.reshape(
person_data["hand_left_keypoints_2d"], (-1, 3)
)[:hand_markers, :]
keypoints_array[
body_markers + hand_markers : body_markers + 2 * hand_markers, :
] = np.reshape(person_data["hand_right_keypoints_2d"], (-1, 3))[
:hand_markers, :
]
if "face_keypoints_2d" in person_data:
keypoints_array[body_markers + 2 * hand_markers :, :] = np.reshape(
person_data["face_keypoints_2d"], (-1, 3)
)[:face_markers, :]

return keypoints_array

def process_tracked_objects(self, **kwargs) -> np.ndarray:
"""
Convert the recorded JSON data into the structured numpy array format.
"""
# In this case, the recorded_objects are already in the desired format,
# so we simply return them.
self.recorded_objects_array = self.parse_openpose_jsons(
self.json_directory_path
)
return self.recorded_objects_array
Loading