From 778640ee26509e75d1894bff87f2289f536d41c0 Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Tue, 13 Feb 2024 14:56:00 -0800 Subject: [PATCH 1/6] select right leg by default and remove mtp angles --- gait_analysis/function/gait_analysis.py | 181 ++++++++++++++++++++---- gait_analysis/function/handler.py | 11 +- gait_analysis/function/utils.py | 9 ++ 3 files changed, 165 insertions(+), 36 deletions(-) diff --git a/gait_analysis/function/gait_analysis.py b/gait_analysis/function/gait_analysis.py index 246e456..96e7cea 100644 --- a/gait_analysis/function/gait_analysis.py +++ b/gait_analysis/function/gait_analysis.py @@ -33,27 +33,52 @@ class gait_analysis(kinematics): def __init__(self, session_dir, trial_name, leg='auto', lowpass_cutoff_frequency_for_coordinate_values=-1, - n_gait_cycles=-1): + n_gait_cycles=-1, gait_style='auto', trimming_start=0, + trimming_end=0): # Inherit init from kinematics class. super().__init__( session_dir, trial_name, lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values) + + # We might want to trim the start/end of the trial to remove bad data. + # For example, this might be needed with HRNet during overground + # walking, where, at the end, the subject is leaving the field of view + # but HRNet returns relatively high confidence values. As a result, + # the trial is not well trimmed. Here, we provide the option to + # manually trim the start and end of the trial. + self.trimming_start = trimming_start + self.trimming_end = trimming_end # Marker data load and filter. - self.markerDict= self.get_marker_dict(session_dir, trial_name, - lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values) + self.markerDict = self.get_marker_dict(session_dir, trial_name, + lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values) # Coordinate values. self.coordinateValues = self.get_coordinate_values() + # Trim marker data and coordinate values. + if self.trimming_start > 0: + self.idx_trim_start = np.where(np.round(self.markerDict['time'] - self.trimming_start,6) <= 0)[0][-1] + self.markerDict['time'] = self.markerDict['time'][self.idx_trim_start:,] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][self.idx_trim_start:,:] + self.coordinateValues = self.coordinateValues.iloc[self.idx_trim_start:] + + if self.trimming_end > 0: + self.idx_trim_end = np.where(np.round(self.markerDict['time'],6) <= np.round(self.markerDict['time'][-1] - self.trimming_end,6))[0][-1] + 1 + self.markerDict['time'] = self.markerDict['time'][:self.idx_trim_end,] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][:self.idx_trim_end,:] + self.coordinateValues = self.coordinateValues.iloc[:self.idx_trim_end] + # Segment gait cycles. self.gaitEvents = self.segment_walking(n_gait_cycles=n_gait_cycles,leg=leg) self.nGaitCycles = np.shape(self.gaitEvents['ipsilateralIdx'])[0] # Determine treadmill speed (0 if overground). - self.treadmillSpeed,_ = self.compute_treadmill_speed() + self.treadmillSpeed,_ = self.compute_treadmill_speed(gait_style=gait_style) # Initialize variables to be lazy loaded. self._comValues = None @@ -63,6 +88,10 @@ def __init__(self, session_dir, trial_name, leg='auto', def comValues(self): if self._comValues is None: self._comValues = self.get_center_of_mass_values() + if self.trimming_start > 0: + self._comValues = self._comValues.iloc[self.idx_trim_start:] + if self.trimming_end > 0: + self._comValues = self._comValues.iloc[:self.idx_trim_end] return self._comValues # Compute gait frame. @@ -205,29 +234,37 @@ def compute_cadence(self,return_all=False): else: return cadence, units - def compute_treadmill_speed(self, overground_speed_threshold=0.3,return_all=False): - - leg,_ = self.get_leg() + def compute_treadmill_speed(self, overground_speed_threshold=0.3, + gait_style='auto', return_all=False): - foot_position = self.markerDict['markers'][leg + '_ankle_study'] - - stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2])) - startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int) - endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int) + # Heuristic to determine if overground or treadmill. + if gait_style == 'auto' or gait_style == 'treadmill': + leg,_ = self.get_leg() - # Average instantaneous velocities. - dt = np.diff(self.markerDict['time'][:2])[0] - for i in range(self.nGaitCycles): - footVel = np.linalg.norm(np.mean(np.diff( - foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt) - - treadmillSpeeds = footVel - treadmillSpeed = np.mean(treadmillSpeeds) + foot_position = self.markerDict['markers'][leg + '_ankle_study'] + + stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2])) + startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int) + endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int) + + # Average instantaneous velocities. + dt = np.diff(self.markerDict['time'][:2])[0] + treadmillSpeeds = np.zeros((self.nGaitCycles,)) + for i in range(self.nGaitCycles): + treadmillSpeeds[i,] = np.linalg.norm(np.mean(np.diff( + foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt) + + treadmillSpeed = np.mean(treadmillSpeeds) + + # Overground if treadmill speed is below threshold and gait style not set to treadmill. + if treadmillSpeed < overground_speed_threshold and not gait_style == 'treadmill': + treadmillSpeed = 0 + treadmillSpeeds = np.zeros(self.nGaitCycles) - # Overground. - if treadmillSpeed < overground_speed_threshold: + # Overground if gait style set to overground. + elif gait_style == 'overground': treadmillSpeed = 0 - treadmillSpeeds = np.zeros(treadmillSpeeds.shape) + treadmillSpeeds = np.zeros(self.nGaitCycles) # Define units. units = 'm/s' @@ -622,7 +659,7 @@ def get_coordinates_normalized_time(self): data = self.coordinateValues.to_numpy(copy=True) coordValuesNorm = [] for i in range(self.nGaitCycles): - coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]] + coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]+1] coordValuesNorm.append(np.stack([np.interp(np.linspace(0,100,101), np.linspace(0,100,len(coordValues)),coordValues[:,i]) \ for i in range(coordValues.shape[1])],axis=1)) @@ -648,6 +685,65 @@ def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False): # n_gait_cycles = -1 finds all accessible gait cycles. Otherwise, it # finds that many gait cycles, working backwards from end of trial. + + # Helper functions + def detect_gait_peaks(r_calc_rel_x, + l_calc_rel_x, + r_toe_rel_x, + l_toe_rel_x, + prominence = 0.3): + # Find HS. + rHS, _ = find_peaks(r_calc_rel_x, prominence=prominence) + lHS, _ = find_peaks(l_calc_rel_x, prominence=prominence) + + # Find TO. + rTO, _ = find_peaks(-r_toe_rel_x, prominence=prominence) + lTO, _ = find_peaks(-l_toe_rel_x, prominence=prominence) + + return rHS,lHS,rTO,lTO + + def detect_correct_order(rHS, rTO, lHS, lTO): + # checks if the peaks are in the right order + + expectedOrder = {'rHS': 'lTO', + 'lTO': 'lHS', + 'lHS': 'rTO', + 'rTO': 'rHS'} + + # Identify vector that has the smallest value in it. Put this vector name + # in vName1 + vectors = {'rHS': rHS, 'rTO': rTO, 'lHS': lHS, 'lTO': lTO} + non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0} + + # Check if there are any non-empty vectors + if not non_empty_vectors: + return True # All vectors are empty, consider it correct order + + vName1 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0]) + + # While there are any values in any of the vectors (rHS, rTO, lHS, or lTO) + while any([len(vName) > 0 for vName in vectors.values()]): + # Delete the smallest value from the vName1 + vectors[vName1] = np.delete(vectors[vName1], 0) + + # Then find the vector with the next smallest value. Define vName2 as the + # name of this vector + non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0} + + # Check if there are any non-empty vectors + if not non_empty_vectors: + break # All vectors are empty, consider it correct order + + vName2 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0]) + + # If vName2 != expectedOrder[vName1], return False + if vName2 != expectedOrder[vName1]: + return False + + # Set vName1 equal to vName2 and clear vName2 + vName1, vName2 = vName2, '' + + return True # Subtract sacrum from foot. # It looks like the position-based approach will be more robust. @@ -666,13 +762,36 @@ def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False): self.markerDict['markers']['L_toe_study'] - self.markerDict['markers']['L.PSIS_study'])[:,0] - # Find HS. - rHS, _ = find_peaks(r_calc_rel_x, prominence=0.3) - lHS, _ = find_peaks(l_calc_rel_x, prominence=0.3) - - # Find TO. - rTO, _ = find_peaks(-r_toe_rel_x, prominence=0.3) - lTO, _ = find_peaks(-l_toe_rel_x, prominence=0.3) + # Identify which direction the subject is walking. + r_psis_x = self.markerDict['markers']['r.PSIS_study'][:,0] + r_asis_x = self.markerDict['markers']['r.ASIS_study'][:,0] + r_dir_x = r_asis_x-r_psis_x + position_approach_scaling = np.where(r_dir_x > 0, 1, -1) + # Adjust relative positions accordingly. + r_calc_rel_x *= position_approach_scaling + r_toe_rel_x *= position_approach_scaling + l_calc_rel_x *= position_approach_scaling + l_toe_rel_x *= position_approach_scaling + + # Detect peaks, check if they're in the right order, if not reduce prominence. + # the peaks can be less prominent with pathological or slower gait patterns + prominences = [0.3, 0.25, 0.2] + + for i,prom in enumerate(prominences): + rHS,lHS,rTO,lTO = detect_gait_peaks(r_calc_rel_x=r_calc_rel_x, + l_calc_rel_x=l_calc_rel_x, + r_toe_rel_x=r_toe_rel_x, + l_toe_rel_x=l_toe_rel_x, + prominence=prom) + if not detect_correct_order(rHS=rHS, rTO=rTO, lHS=lHS, lTO=lTO): + if prom == prominences[-1]: + raise ValueError('The ordering of gait events is not correct. Consider trimming your trial using the trimming_start and trimming_end options.') + else: + print('The gait events were not in the correct order. Trying peak detection again ' + + 'with prominence = ' + str(prominences[i+1]) + '.') + else: + # everything was in the correct order. continue. + break if visualize: import matplotlib.pyplot as plt diff --git a/gait_analysis/function/handler.py b/gait_analysis/function/handler.py index 578f30f..9fa53d5 100644 --- a/gait_analysis/function/handler.py +++ b/gait_analysis/function/handler.py @@ -82,18 +82,17 @@ def handler(event, context): # %% Process data. # Init gait analysis and get gait events. - legs = ['r','l'] - gait, gait_events, ipsilateral = {}, {}, {} + legs = ['r'] + gait, gait_events = {}, {} for leg in legs: gait[leg] = gait_analysis( sessionDir, trial_name, leg=leg, lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, n_gait_cycles=n_gait_cycles) gait_events[leg] = gait[leg].get_gait_events() - ipsilateral[leg] = gait_events[leg]['ipsilateralTime'][0,-1] # Select last leg. - last_leg = 'r' if ipsilateral['r'] > ipsilateral['l'] else 'l' + last_leg = 'r' # Compute scalars. gait_scalars = gait[last_leg].compute_scalars(scalar_names) @@ -180,7 +179,7 @@ def handler(event, context): datasets.append({}) for j in range(coordValues.shape[1]): # Exclude knee_angle_r_beta and knee_angle_l_beta - if 'beta' in colNames[j]: + if 'beta' in colNames[j] or 'mtp' in colNames[j]: continue datasets[i][colNames[j]] = coordValues[i,j] @@ -189,6 +188,8 @@ def handler(event, context): y_axes.remove('time') y_axes.remove('knee_angle_r_beta') y_axes.remove('knee_angle_l_beta') + y_axes.remove('mtp_angle_r') + y_axes.remove('mtp_angle_l') # Create results dictionnary. results = { diff --git a/gait_analysis/function/utils.py b/gait_analysis/function/utils.py index b6221f2..a693e89 100644 --- a/gait_analysis/function/utils.py +++ b/gait_analysis/function/utils.py @@ -71,6 +71,15 @@ def get_user_sessions(): return sessions +# Returns a list of all sessions of the user. +# TODO: this also contains public sessions of other users. +def get_user_sessions_all(user_token=API_TOKEN): + sessions = requests.get( + API_URL + "sessions/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return sessions + def get_trial_json(trial_id): trialJson = requests.get( API_URL + "trials/{}/".format(trial_id), From 9a5a424825db855db63844145581aad8c1a6e7f7 Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Thu, 15 Feb 2024 11:26:49 -0800 Subject: [PATCH 2/6] updates overground gait analysis --- gait_analysis/function/gait_analysis.py | 55 +++++++++++++++---------- gait_analysis/function/handler.py | 3 +- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/gait_analysis/function/gait_analysis.py b/gait_analysis/function/gait_analysis.py index 96e7cea..eccac87 100644 --- a/gait_analysis/function/gait_analysis.py +++ b/gait_analysis/function/gait_analysis.py @@ -746,32 +746,46 @@ def detect_correct_order(rHS, rTO, lHS, lTO): return True # Subtract sacrum from foot. - # It looks like the position-based approach will be more robust. - r_calc_rel_x = ( + # It looks like the position-based approach will be more robust. + r_calc_rel = ( self.markerDict['markers']['r_calc_study'] - - self.markerDict['markers']['r.PSIS_study'])[:,0] - r_toe_rel_x = ( + self.markerDict['markers']['r.PSIS_study']) + + r_toe_rel = ( self.markerDict['markers']['r_toe_study'] - - self.markerDict['markers']['r.PSIS_study'])[:,0] - + self.markerDict['markers']['r.PSIS_study']) + r_toe_rel_x = r_toe_rel[:,0] # Repeat for left. - l_calc_rel_x = ( + l_calc_rel = ( self.markerDict['markers']['L_calc_study'] - - self.markerDict['markers']['L.PSIS_study'])[:,0] - l_toe_rel_x = ( + self.markerDict['markers']['L.PSIS_study']) + l_toe_rel = ( self.markerDict['markers']['L_toe_study'] - - self.markerDict['markers']['L.PSIS_study'])[:,0] + self.markerDict['markers']['L.PSIS_study']) # Identify which direction the subject is walking. - r_psis_x = self.markerDict['markers']['r.PSIS_study'][:,0] - r_asis_x = self.markerDict['markers']['r.ASIS_study'][:,0] - r_dir_x = r_asis_x-r_psis_x - position_approach_scaling = np.where(r_dir_x > 0, 1, -1) - # Adjust relative positions accordingly. - r_calc_rel_x *= position_approach_scaling - r_toe_rel_x *= position_approach_scaling - l_calc_rel_x *= position_approach_scaling - l_toe_rel_x *= position_approach_scaling + mid_psis = (self.markerDict['markers']['r.PSIS_study'] + self.markerDict['markers']['L.PSIS_study'])/2 + mid_asis = (self.markerDict['markers']['r.ASIS_study'] + self.markerDict['markers']['L.ASIS_study'])/2 + mid_dir = mid_asis - mid_psis + mid_dir_floor = np.copy(mid_dir) + mid_dir_floor[:,1] = 0 + mid_dir_floor = mid_dir_floor / np.linalg.norm(mid_dir_floor,axis=1,keepdims=True) + + # Dot product projections + r_calc_rel_x = np.einsum('ij,ij->i', mid_dir_floor,r_calc_rel) + l_calc_rel_x = np.einsum('ij,ij->i', mid_dir_floor,l_calc_rel) + r_toe_rel_x = np.einsum('ij,ij->i', mid_dir_floor,r_toe_rel) + l_toe_rel_x = np.einsum('ij,ij->i', mid_dir_floor,l_toe_rel) + + # Old Approach that does not take the heading direction into account. + # r_psis_x = self.markerDict['markers']['r.PSIS_study'][:,0] + # r_asis_x = self.markerDict['markers']['r.ASIS_study'][:,0] + # r_dir_x = r_asis_x-r_psis_x + # position_approach_scaling = np.where(r_dir_x > 0, 1, -1) + # r_calc_rel_x = r_calc_rel[:,0] * position_approach_scaling + # r_toe_rel_x = r_toe_rel[:,0] * position_approach_scaling + # l_calc_rel_x = l_calc_rel[:,0] * position_approach_scaling + # l_toe_rel_x = l_toe_rel[:,0] * position_approach_scaling # Detect peaks, check if they're in the right order, if not reduce prominence. # the peaks can be less prominent with pathological or slower gait patterns @@ -901,5 +915,4 @@ def detect_correct_order(rHS, rTO, lHS, lTO): 'eventNamesContralateral':['TO','HS'], 'ipsilateralLeg':leg} - return gaitEvents - + return gaitEvents \ No newline at end of file diff --git a/gait_analysis/function/handler.py b/gait_analysis/function/handler.py index 9fa53d5..026555c 100644 --- a/gait_analysis/function/handler.py +++ b/gait_analysis/function/handler.py @@ -88,7 +88,8 @@ def handler(event, context): gait[leg] = gait_analysis( sessionDir, trial_name, leg=leg, lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, - n_gait_cycles=n_gait_cycles) + n_gait_cycles=n_gait_cycles, gait_style='overground', + trimming_start=0, trimming_end=0.5) gait_events[leg] = gait[leg].get_gait_events() # Select last leg. From b667b28bb95fe9052650b0db45bf9b05f9cf67f3 Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Thu, 15 Feb 2024 11:30:40 -0800 Subject: [PATCH 3/6] copying overground gait analysis folder --- treadmill_gait_analysis/.dockerignore | 4 + treadmill_gait_analysis/.gitignore | 7 + treadmill_gait_analysis/Dockerfile | 30 + treadmill_gait_analysis/docker-compose.yml | 12 + treadmill_gait_analysis/function/entrypoint | 6 + .../function/gait_analysis.py | 918 ++++++++++++++++++ treadmill_gait_analysis/function/handler.py | 207 ++++ treadmill_gait_analysis/function/utils.py | 747 ++++++++++++++ treadmill_gait_analysis/function/utilsAPI.py | 33 + .../function/utilsAuthentication.py | 77 ++ .../function/utilsKinematics.py | 409 ++++++++ .../function/utilsProcessing.py | 620 ++++++++++++ treadmill_gait_analysis/function/utilsTRC.py | 298 ++++++ treadmill_gait_analysis/requirements.txt | 8 + 14 files changed, 3376 insertions(+) create mode 100644 treadmill_gait_analysis/.dockerignore create mode 100644 treadmill_gait_analysis/.gitignore create mode 100644 treadmill_gait_analysis/Dockerfile create mode 100644 treadmill_gait_analysis/docker-compose.yml create mode 100644 treadmill_gait_analysis/function/entrypoint create mode 100644 treadmill_gait_analysis/function/gait_analysis.py create mode 100644 treadmill_gait_analysis/function/handler.py create mode 100644 treadmill_gait_analysis/function/utils.py create mode 100644 treadmill_gait_analysis/function/utilsAPI.py create mode 100644 treadmill_gait_analysis/function/utilsAuthentication.py create mode 100644 treadmill_gait_analysis/function/utilsKinematics.py create mode 100644 treadmill_gait_analysis/function/utilsProcessing.py create mode 100644 treadmill_gait_analysis/function/utilsTRC.py create mode 100644 treadmill_gait_analysis/requirements.txt diff --git a/treadmill_gait_analysis/.dockerignore b/treadmill_gait_analysis/.dockerignore new file mode 100644 index 0000000..8cbee42 --- /dev/null +++ b/treadmill_gait_analysis/.dockerignore @@ -0,0 +1,4 @@ +.git +Data/ +.env +docker \ No newline at end of file diff --git a/treadmill_gait_analysis/.gitignore b/treadmill_gait_analysis/.gitignore new file mode 100644 index 0000000..97bbf1a --- /dev/null +++ b/treadmill_gait_analysis/.gitignore @@ -0,0 +1,7 @@ +__pycache__ +.env +*.log +*.ipynb_checkpoints +Data/* + +*DS_Store \ No newline at end of file diff --git a/treadmill_gait_analysis/Dockerfile b/treadmill_gait_analysis/Dockerfile new file mode 100644 index 0000000..db6ba60 --- /dev/null +++ b/treadmill_gait_analysis/Dockerfile @@ -0,0 +1,30 @@ +FROM stanfordnmbl/opensim-python:4.3 + +ARG FUNCTION_DIR="/function" + +RUN apt-get update && \ + apt-get install -y \ + build-essential \ + python3-dev \ + g++ \ + make \ + cmake \ + unzip \ + libcurl4-openssl-dev + +# Copy function code +RUN mkdir -p ${FUNCTION_DIR} +COPY ./requirements.txt /requirements.txt +# Install the requirements.txt +RUN python3.8 -m pip install --no-cache-dir -r /requirements.txt +RUN python3.8 -m pip install --target ${FUNCTION_DIR} awslambdaric + +ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/bin/aws-lambda-rie +RUN chmod +x /usr/bin/aws-lambda-rie + +COPY ./function ${FUNCTION_DIR} +RUN chmod +x ${FUNCTION_DIR}/entrypoint +WORKDIR ${FUNCTION_DIR} + +ENTRYPOINT ["./entrypoint"] +CMD ["handler.handler"] diff --git a/treadmill_gait_analysis/docker-compose.yml b/treadmill_gait_analysis/docker-compose.yml new file mode 100644 index 0000000..954f292 --- /dev/null +++ b/treadmill_gait_analysis/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3.8' + +services: + gait_analysis: +# platform: linux/amd64 + build: + context: . + dockerfile: ./Dockerfile + ports: + - 9005:8080 + env_file: + - ./.env diff --git a/treadmill_gait_analysis/function/entrypoint b/treadmill_gait_analysis/function/entrypoint new file mode 100644 index 0000000..10b1405 --- /dev/null +++ b/treadmill_gait_analysis/function/entrypoint @@ -0,0 +1,6 @@ +#!/bin/bash +if [ -z "${AWS_LAMBDA_RUNTIME_API}" ]; then + exec /usr/bin/aws-lambda-rie /usr/bin/python3.8 -m awslambdaric $@ +else + exec /usr/bin/python3.8 -m awslambdaric $@ +fi diff --git a/treadmill_gait_analysis/function/gait_analysis.py b/treadmill_gait_analysis/function/gait_analysis.py new file mode 100644 index 0000000..eccac87 --- /dev/null +++ b/treadmill_gait_analysis/function/gait_analysis.py @@ -0,0 +1,918 @@ +""" + --------------------------------------------------------------------------- + OpenCap processing: gaitAnalysis.py + --------------------------------------------------------------------------- + + Copyright 2023 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import sys +sys.path.append('../') + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks +from matplotlib import pyplot as plt + +from utilsKinematics import kinematics + + +class gait_analysis(kinematics): + + def __init__(self, session_dir, trial_name, leg='auto', + lowpass_cutoff_frequency_for_coordinate_values=-1, + n_gait_cycles=-1, gait_style='auto', trimming_start=0, + trimming_end=0): + + # Inherit init from kinematics class. + super().__init__( + session_dir, + trial_name, + lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values) + + # We might want to trim the start/end of the trial to remove bad data. + # For example, this might be needed with HRNet during overground + # walking, where, at the end, the subject is leaving the field of view + # but HRNet returns relatively high confidence values. As a result, + # the trial is not well trimmed. Here, we provide the option to + # manually trim the start and end of the trial. + self.trimming_start = trimming_start + self.trimming_end = trimming_end + + # Marker data load and filter. + self.markerDict = self.get_marker_dict(session_dir, trial_name, + lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values) + + # Coordinate values. + self.coordinateValues = self.get_coordinate_values() + + # Trim marker data and coordinate values. + if self.trimming_start > 0: + self.idx_trim_start = np.where(np.round(self.markerDict['time'] - self.trimming_start,6) <= 0)[0][-1] + self.markerDict['time'] = self.markerDict['time'][self.idx_trim_start:,] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][self.idx_trim_start:,:] + self.coordinateValues = self.coordinateValues.iloc[self.idx_trim_start:] + + if self.trimming_end > 0: + self.idx_trim_end = np.where(np.round(self.markerDict['time'],6) <= np.round(self.markerDict['time'][-1] - self.trimming_end,6))[0][-1] + 1 + self.markerDict['time'] = self.markerDict['time'][:self.idx_trim_end,] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][:self.idx_trim_end,:] + self.coordinateValues = self.coordinateValues.iloc[:self.idx_trim_end] + + # Segment gait cycles. + self.gaitEvents = self.segment_walking(n_gait_cycles=n_gait_cycles,leg=leg) + self.nGaitCycles = np.shape(self.gaitEvents['ipsilateralIdx'])[0] + + # Determine treadmill speed (0 if overground). + self.treadmillSpeed,_ = self.compute_treadmill_speed(gait_style=gait_style) + + # Initialize variables to be lazy loaded. + self._comValues = None + self._R_world_to_gait = None + + # Compute COM trajectory. + def comValues(self): + if self._comValues is None: + self._comValues = self.get_center_of_mass_values() + if self.trimming_start > 0: + self._comValues = self._comValues.iloc[self.idx_trim_start:] + if self.trimming_end > 0: + self._comValues = self._comValues.iloc[:self.idx_trim_end] + return self._comValues + + # Compute gait frame. + def R_world_to_gait(self): + if self._R_world_to_gait is None: + self._R_world_to_gait = self.compute_gait_frame() + return self._R_world_to_gait + + def get_gait_events(self): + + return self.gaitEvents + + def compute_scalars(self,scalarNames,return_all=False): + + # Verify that scalarNames are methods in gait_analysis. + method_names = [func for func in dir(self) if callable(getattr(self, func))] + possibleMethods = [entry for entry in method_names if 'compute_' in entry] + + if scalarNames is None: + print('No scalars defined, these methods are available:') + print(*possibleMethods) + return + + nonexistant_methods = [entry for entry in scalarNames if 'compute_' + entry not in method_names] + + if len(nonexistant_methods) > 0: + raise Exception(str(['compute_' + a for a in nonexistant_methods]) + ' does not exist in gait_analysis class.') + + scalarDict = {} + for scalarName in scalarNames: + thisFunction = getattr(self, 'compute_' + scalarName) + scalarDict[scalarName] = {} + (scalarDict[scalarName]['value'], + scalarDict[scalarName]['units']) = thisFunction(return_all=return_all) + + return scalarDict + + def compute_stride_length(self,return_all=False): + + leg,_ = self.get_leg() + + calc_position = self.markerDict['markers'][leg + '_calc_study'] + + # On treadmill, the stride length is the difference in ipsilateral + # calcaneus position at heel strike + treadmill speed * time. + strideLengths = ( + np.linalg.norm( + calc_position[self.gaitEvents['ipsilateralIdx'][:,:1]] - + calc_position[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) + + self.treadmillSpeed * np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)])) + + # Average across all strides. + strideLength = np.mean(strideLengths) + + # Define units. + units = 'm' + + if return_all: + return strideLengths,units + else: + return strideLength, units + + def compute_step_length(self,return_all=False): + leg, contLeg = self.get_leg() + step_lengths = {} + + step_lengths[contLeg.lower()] = (np.linalg.norm( + self.markerDict['markers'][leg + '_calc_study'][self.gaitEvents['ipsilateralIdx'][:,:1]] - + self.markerDict['markers'][contLeg + '_calc_study'][self.gaitEvents['contralateralIdx'][:,1:2]], axis=2) + + self.treadmillSpeed * (self.gaitEvents['contralateralTime'][:,1:2] - + self.gaitEvents['ipsilateralTime'][:,:1])) + + step_lengths[leg.lower()] = (np.linalg.norm( + self.markerDict['markers'][leg + '_calc_study'][self.gaitEvents['ipsilateralIdx'][:,2:]] - + self.markerDict['markers'][contLeg + '_calc_study'][self.gaitEvents['contralateralIdx'][:,1:2]], axis=2) + + self.treadmillSpeed * (-self.gaitEvents['contralateralTime'][:,1:2] + + self.gaitEvents['ipsilateralTime'][:,2:])) + + # Average across all strides. + step_length = {key: np.mean(values) for key, values in step_lengths.items()} + + # Define units. + units = 'm' + + # some functions depend on having values for each step, otherwise return average + if return_all: + return step_lengths, units + else: + return step_length, units + + def compute_step_length_symmetry(self,return_all=False): + step_lengths,units = self.compute_step_length(return_all=True) + + step_length_symmetry_all = step_lengths['r'] / step_lengths['l'] * 100 + + # Average across strides + step_length_symmetry = np.mean(step_length_symmetry_all) + + # define units + units = '% (R/L)' + + if return_all: + return step_length_symmetry_all, units + else: + return step_length_symmetry, units + + def compute_gait_speed(self,return_all=False): + + comValuesArray = np.vstack((self.comValues()['x'],self.comValues()['y'],self.comValues()['z'])).T + gait_speeds = ( + np.linalg.norm( + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,:1]] - + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) / + np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + self.treadmillSpeed) + + # Average across all strides. + gait_speed = np.mean(gait_speeds) + + # Define units. + units = 'm/s' + + if return_all: + return gait_speeds,units + else: + return gait_speed, units + + def compute_cadence(self,return_all=False): + + # In steps per minute. + cadence_all = 60*2/np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + + # Average across all strides. + cadence = np.mean(cadence_all) + + # Define units. + units = 'steps/min' + + if return_all: + return cadence_all,units + else: + return cadence, units + + def compute_treadmill_speed(self, overground_speed_threshold=0.3, + gait_style='auto', return_all=False): + + # Heuristic to determine if overground or treadmill. + if gait_style == 'auto' or gait_style == 'treadmill': + leg,_ = self.get_leg() + + foot_position = self.markerDict['markers'][leg + '_ankle_study'] + + stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2])) + startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int) + endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int) + + # Average instantaneous velocities. + dt = np.diff(self.markerDict['time'][:2])[0] + treadmillSpeeds = np.zeros((self.nGaitCycles,)) + for i in range(self.nGaitCycles): + treadmillSpeeds[i,] = np.linalg.norm(np.mean(np.diff( + foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt) + + treadmillSpeed = np.mean(treadmillSpeeds) + + # Overground if treadmill speed is below threshold and gait style not set to treadmill. + if treadmillSpeed < overground_speed_threshold and not gait_style == 'treadmill': + treadmillSpeed = 0 + treadmillSpeeds = np.zeros(self.nGaitCycles) + + # Overground if gait style set to overground. + elif gait_style == 'overground': + treadmillSpeed = 0 + treadmillSpeeds = np.zeros(self.nGaitCycles) + + # Define units. + units = 'm/s' + + if return_all: + return treadmillSpeeds,units + else: + return treadmillSpeed, units + + def compute_step_width(self,return_all=False): + + leg,contLeg = self.get_leg() + + # Get ankle joint center positions. + ankle_position_ips = ( + self.markerDict['markers'][leg + '_ankle_study'] + + self.markerDict['markers'][leg + '_mankle_study'])/2 + ankle_position_cont = ( + self.markerDict['markers'][contLeg + '_ankle_study'] + + self.markerDict['markers'][contLeg + '_mankle_study'])/2 + + # Find indices of 40-60% of the stance phase + ips_stance_length = np.diff(self.gaitEvents['ipsilateralIdx'][:,(0,1)]) + cont_stance_length = (self.gaitEvents['contralateralIdx'][:,0] - + self.gaitEvents['ipsilateralIdx'][:,0] + + self.gaitEvents['ipsilateralIdx'][:,2]- + self.gaitEvents['contralateralIdx'][:,1]) + + midstanceIdx_ips = [range(self.gaitEvents['ipsilateralIdx'][i,0] + + int(np.round(.4*ips_stance_length[i])), + self.gaitEvents['ipsilateralIdx'][i,0] + + int(np.round(.6*ips_stance_length[i]))) + for i in range(self.nGaitCycles)] + + midstanceIdx_cont = [range(np.min((self.gaitEvents['contralateralIdx'][i,1] + + int(np.round(.4*cont_stance_length[i])), + self.gaitEvents['ipsilateralIdx'][i,2]-1)), + np.min((self.gaitEvents['contralateralIdx'][i,1] + + int(np.round(.6*cont_stance_length[i])), + self.gaitEvents['ipsilateralIdx'][i,2]))) + for i in range(self.nGaitCycles)] + + ankleVector = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + ankleVector[i,:] = ( + np.mean(ankle_position_cont[midstanceIdx_cont[i],:],axis=0) - + np.mean(ankle_position_ips[midstanceIdx_ips[i],:],axis=0)) + + ankleVector_inGaitFrame = np.array( + [np.dot(ankleVector[i,:], self.R_world_to_gait()[i,:,:]) + for i in range(self.nGaitCycles)]) + + # Step width is z distance. + stepWidths = np.abs(ankleVector_inGaitFrame[:,2]) + + # Average across all strides. + stepWidth = np.mean(stepWidths) + + # Define units. + units = 'm' + + if return_all: + return stepWidths, units + else: + return stepWidth, units + + def compute_stance_time(self, return_all=False): + + stanceTimes = np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) + + # Average across all strides. + stanceTime = np.mean(stanceTimes) + + # Define units. + units = 's' + + if return_all: + return stanceTimes, units + else: + return stanceTime, units + + def compute_swing_time(self, return_all=False): + + swingTimes = np.diff(self.gaitEvents['ipsilateralTime'][:,1:]) + + # Average across all strides. + swingTime = np.mean(swingTimes) + + # Define units. + units = 's' + + if return_all: + return swingTimes, units + else: + return swingTime, units + + def compute_single_support_time(self,return_all=False): + + double_support_time,_ = self.compute_double_support_time(return_all=True) + + singleSupportTimes = 100 - double_support_time + + # Average across all strides. + singleSupportTime = np.mean(singleSupportTimes) + + # Define units. + units = '%' + + if return_all: + return singleSupportTimes,units + else: + return singleSupportTime, units + + def compute_double_support_time(self,return_all=False): + + # Ipsilateral stance time - contralateral swing time. + doubleSupportTimes = ( + (np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) - + np.diff(self.gaitEvents['contralateralTime'][:,:2])) / + np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)])) * 100 + + # Average across all strides. + doubleSupportTime = np.mean(doubleSupportTimes) + + # Define units. + units = '%' + + if return_all: + return doubleSupportTimes, units + else: + return doubleSupportTime, units + + def compute_midswing_dorsiflexion_angle(self,return_all=False): + # compute ankle dorsiflexion angle during midstance + to_1_idx = self.gaitEvents['ipsilateralIdx'][:,1] + hs_2_idx = self.gaitEvents['ipsilateralIdx'][:,2] + + # ankle markers + leg,contLeg = self.get_leg() + ankleVector = (self.markerDict['markers'][leg + '_ankle_study'] - + self.markerDict['markers'][contLeg + '_ankle_study']) + ankleVector_inGaitFrame = np.array( + [np.dot(ankleVector, self.R_world_to_gait()[i,:,:]) + for i in range(self.nGaitCycles)]) + + swingDfAngles = np.zeros((to_1_idx.shape)) + + for i in range(self.nGaitCycles): + # find index within a swing phase with the smallest z distance between ankles + idx_midSwing = np.argmin(np.abs(ankleVector_inGaitFrame[ + i,to_1_idx[i]:hs_2_idx[i],0]))+to_1_idx[i] + + swingDfAngles[i] = np.mean(self.coordinateValues['ankle_angle_' + + self.gaitEvents['ipsilateralLeg']].to_numpy()[idx_midSwing]) + + # Average across all strides. + swingDfAngle = np.mean(swingDfAngles) + + # Define units. + units = 'deg' + + if return_all: + return swingDfAngles, units + else: + return swingDfAngle, units + + def compute_midswing_ankle_heigh_dif(self,return_all=False): + # compute vertical clearance of the swing ankle above the stance ankle + # at the time when the ankles pass by one another + to_1_idx = self.gaitEvents['ipsilateralIdx'][:,1] + hs_2_idx = self.gaitEvents['ipsilateralIdx'][:,2] + + # ankle markers + leg,contLeg = self.get_leg() + ankleVector = (self.markerDict['markers'][leg + '_ankle_study'] - + self.markerDict['markers'][contLeg + '_ankle_study']) + ankleVector_inGaitFrame = np.array( + [np.dot(ankleVector, self.R_world_to_gait()[i,:,:]) + for i in range(self.nGaitCycles)]) + + swingAnkleHeighDiffs = np.zeros((to_1_idx.shape)) + + for i in range(self.nGaitCycles): + # find index within a swing phase with the smallest z distance between ankles + idx_midSwing = np.argmin(np.abs(ankleVector_inGaitFrame[ + i,to_1_idx[i]:hs_2_idx[i],0]))+to_1_idx[i] + + swingAnkleHeighDiffs[i] = ankleVector_inGaitFrame[i,idx_midSwing,1] + + # Average across all strides. + swingAnkleHeighDiff = np.mean(swingAnkleHeighDiffs) + + # Define units. + units = 'm' + + if return_all: + return swingAnkleHeighDiffs, units + else: + return swingAnkleHeighDiff, units + + def compute_peak_angle(self,dof,start_idx,end_idx,return_all=False): + # start_idx and end_idx are 1xnGaitCycles + + peakAngles = np.zeros((self.nGaitCycles)) + + for i in range(self.nGaitCycles): + peakAngles[i] = np.max(self.coordinateValues[dof + '_' + + self.gaitEvents['ipsilateralLeg']][start_idx[i]:end_idx[i]]) + + # Average across all strides. + peakAngle = np.mean(peakAngles) + + # Define units. + units = 'deg' + + if return_all: + return peakAngles, units + else: + return peakAngle, units + + def compute_rom(self,dof,start_idx,end_idx,return_all=False): + # start_idx and end_idx are 1xnGaitCycles + + roms = np.zeros((self.nGaitCycles)) + + for i in range(self.nGaitCycles): + roms[i] = np.ptp(self.coordinateValues[dof + '_' + + self.gaitEvents['ipsilateralLeg']][start_idx[i]:end_idx[i]]) + + # Average across all strides. + rom = np.mean(roms) + + # Define units. + units = 'deg' + + if return_all: + return roms, units + else: + return rom, units + + def compute_correlations(self, cols_to_compare=None, visualize=False, + return_all=False): + # this computes a weighted correlation between either side's dofs. + # the weighting is based on mean absolute percent error. In effect, + # this penalizes both shape and magnitude differences. + + leg,contLeg = self.get_leg(lower=True) + + correlations_all_cycles = [] + mean_correlation_all_cycles = np.zeros((self.nGaitCycles,1)) + + for i in range(self.nGaitCycles): + + + hs_ind_1 = self.gaitEvents['ipsilateralIdx'][i,0] + hs_ind_cont = self.gaitEvents['contralateralIdx'][i,1] + hs_ind_2 = self.gaitEvents['ipsilateralIdx'][i,2] + + df1 = pd.DataFrame() + df2 = pd.DataFrame() + + if cols_to_compare is None: + cols_to_compare = df1.columns + + # create a dataframe of coords for this gait cycle + for col in self.coordinateValues.columns: + if col.endswith('_' + leg): + df1[col] = self.coordinateValues[col][hs_ind_1:hs_ind_2] + elif col.endswith('_' + contLeg): + df2[col] = np.concatenate((self.coordinateValues[col][hs_ind_cont:hs_ind_2], + self.coordinateValues[col][hs_ind_1:hs_ind_cont])) + df1 = df1.reset_index(drop=True) + df2 = df2.reset_index(drop=True) + + # Interpolating both dataframes to have 101 rows for each column + df1_interpolated = df1.interpolate(method='linear', limit_direction='both', limit_area='inside', limit=100) + df2_interpolated = df2.interpolate(method='linear', limit_direction='both', limit_area='inside', limit=100) + + # Computing the correlation between appropriate columns in both dataframes + correlations = {} + total_weighted_correlation = 0 + # total_weight = 0 + + for col1 in df1_interpolated.columns: + if any(col1.startswith(col_compare) for col_compare in cols_to_compare): + if col1.endswith('_r'): + corresponding_col = col1[:-2] + '_l' + elif col1.endswith('_l'): + corresponding_col = col1[:-2] + '_r' + + if corresponding_col in df2_interpolated.columns: + signal1 = df1_interpolated[col1] + signal2 = df2_interpolated[corresponding_col] + + max_range_signal1 = np.ptp(signal1) + max_range_signal2 = np.ptp(signal2) + max_range = max(max_range_signal1, max_range_signal2) + + mean_abs_error = np.mean(np.abs(signal1 - signal2)) / max_range + + correlation = signal1.corr(signal2) + weight = 1 - mean_abs_error + + weighted_correlation = correlation * weight + correlations[col1] = weighted_correlation + + total_weighted_correlation += weighted_correlation + + # Plotting the signals if visualize is True + if visualize: + plt.figure(figsize=(8, 5)) + plt.plot(signal1, label='df1') + plt.plot(signal2, label='df2') + plt.title(f"Comparison between {col1} and {corresponding_col} with weighted correlation {weighted_correlation}") + plt.legend() + plt.show() + + mean_correlation_all_cycles[i] = total_weighted_correlation / len(correlations) + correlations_all_cycles.append(correlations) + + if not return_all: + mean_correlation_all_cycles = np.mean(mean_correlation_all_cycles) + correlations_all_cycles = {key: sum(d[key] for d in correlations_all_cycles) / + len(correlations_all_cycles) for key in correlations_all_cycles[0]} + + return correlations_all_cycles, mean_correlation_all_cycles + + def compute_gait_frame(self): + + # Create frame for each gait cycle with x: pelvis heading, + # z: average vector between ASIS during gait cycle, y: cross. + + # Pelvis center trajectory (for overground heading vector). + pelvisMarkerNames = ['r.ASIS_study','L.ASIS_study','r.PSIS_study','L.PSIS_study'] + pelvisMarkers = [self.markerDict['markers'][mkr] for mkr in pelvisMarkerNames] + pelvisCenter = np.mean(np.array(pelvisMarkers),axis=0) + + # Ankle trajectory (for treadmill heading vector). + leg = self.gaitEvents['ipsilateralLeg'] + if leg == 'l': leg='L' + anklePos = self.markerDict['markers'][leg + '_ankle_study'] + + # Vector from left ASIS to right ASIS (for mediolateral direction). + asisMarkerNames = ['L.ASIS_study','r.ASIS_study'] + asisMarkers = [self.markerDict['markers'][mkr] for mkr in asisMarkerNames] + asisVector = np.squeeze(np.diff(np.array(asisMarkers),axis=0)) + + # Heading vector per gait cycle. + # If overground, use pelvis center trajectory; treadmill: ankle trajectory. + if self.treadmillSpeed == 0: + x = np.diff(pelvisCenter[self.gaitEvents['ipsilateralIdx'][:,(0,2)],:],axis=1)[:,0,:] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + else: + x = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + x[i,:] = anklePos[self.gaitEvents['ipsilateralIdx'][i,2]] - \ + anklePos[self.gaitEvents['ipsilateralIdx'][i,1]] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + + # Mean ASIS vector over gait cycle. + z = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + z[i,:] = np.mean(asisVector[self.gaitEvents['ipsilateralIdx'][i,0]: \ + self.gaitEvents['ipsilateralIdx'][i,2]],axis=0) + z = z / np.linalg.norm(z,axis=1,keepdims=True) + + # Cross to get y. + y = np.cross(z,x) + + # 3x3xnSteps. + R_lab_to_gait = np.stack((x.T,y.T,z.T),axis=1).transpose((2, 0, 1)) + + return R_lab_to_gait + + def get_leg(self,lower=False): + + if self.gaitEvents['ipsilateralLeg'] == 'r': + leg = 'r' + contLeg = 'L' + else: + leg = 'L' + contLeg = 'r' + + if lower: + return leg.lower(), contLeg.lower() + else: + return leg, contLeg + + def get_coordinates_normalized_time(self): + + colNames = self.coordinateValues.columns + data = self.coordinateValues.to_numpy(copy=True) + coordValuesNorm = [] + for i in range(self.nGaitCycles): + coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]+1] + coordValuesNorm.append(np.stack([np.interp(np.linspace(0,100,101), + np.linspace(0,100,len(coordValues)),coordValues[:,i]) \ + for i in range(coordValues.shape[1])],axis=1)) + + coordinateValuesTimeNormalized = {} + # Average. + coordVals_mean = np.mean(np.array(coordValuesNorm),axis=0) + coordinateValuesTimeNormalized['mean'] = pd.DataFrame(data=coordVals_mean, columns=colNames) + + # Standard deviation. + if self.nGaitCycles >2: + coordVals_sd = np.std(np.array(coordValuesNorm), axis=0) + coordinateValuesTimeNormalized['sd'] = pd.DataFrame(data=coordVals_sd, columns=colNames) + else: + coordinateValuesTimeNormalized['sd'] = None + + # Return to dataframe. + coordinateValuesTimeNormalized['indiv'] = [pd.DataFrame(data=d, columns=colNames) for d in coordValuesNorm] + + return coordinateValuesTimeNormalized + + def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False): + + # n_gait_cycles = -1 finds all accessible gait cycles. Otherwise, it + # finds that many gait cycles, working backwards from end of trial. + + # Helper functions + def detect_gait_peaks(r_calc_rel_x, + l_calc_rel_x, + r_toe_rel_x, + l_toe_rel_x, + prominence = 0.3): + # Find HS. + rHS, _ = find_peaks(r_calc_rel_x, prominence=prominence) + lHS, _ = find_peaks(l_calc_rel_x, prominence=prominence) + + # Find TO. + rTO, _ = find_peaks(-r_toe_rel_x, prominence=prominence) + lTO, _ = find_peaks(-l_toe_rel_x, prominence=prominence) + + return rHS,lHS,rTO,lTO + + def detect_correct_order(rHS, rTO, lHS, lTO): + # checks if the peaks are in the right order + + expectedOrder = {'rHS': 'lTO', + 'lTO': 'lHS', + 'lHS': 'rTO', + 'rTO': 'rHS'} + + # Identify vector that has the smallest value in it. Put this vector name + # in vName1 + vectors = {'rHS': rHS, 'rTO': rTO, 'lHS': lHS, 'lTO': lTO} + non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0} + + # Check if there are any non-empty vectors + if not non_empty_vectors: + return True # All vectors are empty, consider it correct order + + vName1 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0]) + + # While there are any values in any of the vectors (rHS, rTO, lHS, or lTO) + while any([len(vName) > 0 for vName in vectors.values()]): + # Delete the smallest value from the vName1 + vectors[vName1] = np.delete(vectors[vName1], 0) + + # Then find the vector with the next smallest value. Define vName2 as the + # name of this vector + non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0} + + # Check if there are any non-empty vectors + if not non_empty_vectors: + break # All vectors are empty, consider it correct order + + vName2 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0]) + + # If vName2 != expectedOrder[vName1], return False + if vName2 != expectedOrder[vName1]: + return False + + # Set vName1 equal to vName2 and clear vName2 + vName1, vName2 = vName2, '' + + return True + + # Subtract sacrum from foot. + # It looks like the position-based approach will be more robust. + r_calc_rel = ( + self.markerDict['markers']['r_calc_study'] - + self.markerDict['markers']['r.PSIS_study']) + + r_toe_rel = ( + self.markerDict['markers']['r_toe_study'] - + self.markerDict['markers']['r.PSIS_study']) + r_toe_rel_x = r_toe_rel[:,0] + # Repeat for left. + l_calc_rel = ( + self.markerDict['markers']['L_calc_study'] - + self.markerDict['markers']['L.PSIS_study']) + l_toe_rel = ( + self.markerDict['markers']['L_toe_study'] - + self.markerDict['markers']['L.PSIS_study']) + + # Identify which direction the subject is walking. + mid_psis = (self.markerDict['markers']['r.PSIS_study'] + self.markerDict['markers']['L.PSIS_study'])/2 + mid_asis = (self.markerDict['markers']['r.ASIS_study'] + self.markerDict['markers']['L.ASIS_study'])/2 + mid_dir = mid_asis - mid_psis + mid_dir_floor = np.copy(mid_dir) + mid_dir_floor[:,1] = 0 + mid_dir_floor = mid_dir_floor / np.linalg.norm(mid_dir_floor,axis=1,keepdims=True) + + # Dot product projections + r_calc_rel_x = np.einsum('ij,ij->i', mid_dir_floor,r_calc_rel) + l_calc_rel_x = np.einsum('ij,ij->i', mid_dir_floor,l_calc_rel) + r_toe_rel_x = np.einsum('ij,ij->i', mid_dir_floor,r_toe_rel) + l_toe_rel_x = np.einsum('ij,ij->i', mid_dir_floor,l_toe_rel) + + # Old Approach that does not take the heading direction into account. + # r_psis_x = self.markerDict['markers']['r.PSIS_study'][:,0] + # r_asis_x = self.markerDict['markers']['r.ASIS_study'][:,0] + # r_dir_x = r_asis_x-r_psis_x + # position_approach_scaling = np.where(r_dir_x > 0, 1, -1) + # r_calc_rel_x = r_calc_rel[:,0] * position_approach_scaling + # r_toe_rel_x = r_toe_rel[:,0] * position_approach_scaling + # l_calc_rel_x = l_calc_rel[:,0] * position_approach_scaling + # l_toe_rel_x = l_toe_rel[:,0] * position_approach_scaling + + # Detect peaks, check if they're in the right order, if not reduce prominence. + # the peaks can be less prominent with pathological or slower gait patterns + prominences = [0.3, 0.25, 0.2] + + for i,prom in enumerate(prominences): + rHS,lHS,rTO,lTO = detect_gait_peaks(r_calc_rel_x=r_calc_rel_x, + l_calc_rel_x=l_calc_rel_x, + r_toe_rel_x=r_toe_rel_x, + l_toe_rel_x=l_toe_rel_x, + prominence=prom) + if not detect_correct_order(rHS=rHS, rTO=rTO, lHS=lHS, lTO=lTO): + if prom == prominences[-1]: + raise ValueError('The ordering of gait events is not correct. Consider trimming your trial using the trimming_start and trimming_end options.') + else: + print('The gait events were not in the correct order. Trying peak detection again ' + + 'with prominence = ' + str(prominences[i+1]) + '.') + else: + # everything was in the correct order. continue. + break + + if visualize: + import matplotlib.pyplot as plt + plt.close('all') + plt.figure(1) + plt.plot(self.markerDict['time'],r_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],r_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][rHS], r_calc_rel_x[rHS], color='red', label='rHS') + plt.scatter(self.markerDict['time'][rTO], r_toe_rel_x[rTO], color='blue', label='rTO') + plt.legend() + + plt.figure(2) + plt.plot(self.markerDict['time'],l_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],l_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][lHS], l_calc_rel_x[lHS], color='red', label='lHS') + plt.scatter(self.markerDict['time'][lTO], l_toe_rel_x[lTO], color='blue', label='lTO') + plt.legend() + + # Find the number of gait cycles for the foot of interest. + if leg=='auto': + # Find the last HS of either foot. + if rHS[-1] > lHS[-1]: + leg = 'r' + else: + leg = 'l' + + # Find the number of gait cycles for the foot of interest. + if leg == 'r': + hsIps = rHS + toIps = rTO + hsCont = lHS + toCont = lTO + elif leg == 'l': + hsIps = lHS + toIps = lTO + hsCont = rHS + toCont = rTO + + if len(hsIps)-1 < n_gait_cycles: + print('You requested {} gait cycles, but only {} were found. ' + 'Proceeding with this number.'.format(n_gait_cycles,len(hsIps)-1)) + n_gait_cycles = len(hsIps)-1 + if n_gait_cycles == -1: + n_gait_cycles = len(hsIps)-1 + print('Processing {} gait cycles, leg: '.format(n_gait_cycles) + leg + '.') + + # Ipsilateral gait events: heel strike, toe-off, heel strike. + gaitEvents_ips = np.zeros((n_gait_cycles, 3),dtype=int) + # Contralateral gait events: toe-off, heel strike. + gaitEvents_cont = np.zeros((n_gait_cycles, 2),dtype=int) + if n_gait_cycles <1: + raise Exception('Not enough gait cycles found.') + + for i in range(n_gait_cycles): + # Ipsilateral HS, TO, HS. + gaitEvents_ips[i,0] = hsIps[-i-2] + gaitEvents_ips[i,2] = hsIps[-i-1] + + # Iterate in reverse through ipsilateral TO, finding the one that + # is within the range of gaitEvents_ips. + toIpsFound = False + for j in range(len(toIps)): + if toIps[-j-1] > gaitEvents_ips[i,0] and toIps[-j-1] < gaitEvents_ips[i,2] and not toIpsFound: + gaitEvents_ips[i,1] = toIps[-j-1] + toIpsFound = True + + # Contralateral TO, HS. + # Iterate in reverse through contralateral HS and TO, finding the + # one that is within the range of gaitEvents_ips + hsContFound = False + toContFound = False + for j in range(len(toCont)): + if toCont[-j-1] > gaitEvents_ips[i,0] and toCont[-j-1] < gaitEvents_ips[i,2] and not toContFound: + gaitEvents_cont[i,0] = toCont[-j-1] + toContFound = True + + for j in range(len(hsCont)): + if hsCont[-j-1] > gaitEvents_ips[i,0] and hsCont[-j-1] < gaitEvents_ips[i,2] and not hsContFound: + gaitEvents_cont[i,1] = hsCont[-j-1] + hsContFound = True + + # Skip this step if no contralateral peaks fell within ipsilateral events + # This can happen with noisy data with subject far from camera. + if not toContFound or not hsContFound: + print('Could not find contralateral gait event within ' + + 'ipsilateral gait event range ' + str(i+1) + + ' steps until the end. Skipping this step.') + gaitEvents_cont[i,:] = -1 + gaitEvents_ips[i,:] = -1 + + # Remove any nan rows + mask_ips = (gaitEvents_ips == -1).any(axis=1) + if all(mask_ips): + raise Exception('No good steps for ' + leg + ' leg.') + gaitEvents_ips = gaitEvents_ips[~mask_ips] + gaitEvents_cont = gaitEvents_cont[~mask_ips] + + # Convert gaitEvents to times using self.markerDict['time']. + gaitEventTimes_ips = self.markerDict['time'][gaitEvents_ips] + gaitEventTimes_cont = self.markerDict['time'][gaitEvents_cont] + + gaitEvents = {'ipsilateralIdx':gaitEvents_ips, + 'contralateralIdx':gaitEvents_cont, + 'ipsilateralTime':gaitEventTimes_ips, + 'contralateralTime':gaitEventTimes_cont, + 'eventNamesIpsilateral':['HS','TO','HS'], + 'eventNamesContralateral':['TO','HS'], + 'ipsilateralLeg':leg} + + return gaitEvents \ No newline at end of file diff --git a/treadmill_gait_analysis/function/handler.py b/treadmill_gait_analysis/function/handler.py new file mode 100644 index 0000000..026555c --- /dev/null +++ b/treadmill_gait_analysis/function/handler.py @@ -0,0 +1,207 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: example.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' +import json +import os +import numpy as np + +from gait_analysis import gait_analysis +from utils import get_trial_id, download_trial, import_metadata + + +def handler(event, context): + """ AWS Lambda function handler. This function performs a gait analysis. + + To invoke the function do POST request on the following url + http://localhost:8080/2015-03-31/functions/function/invocations + """ + # temporary placeholder + kwargs = json.loads(event['body']) + + for field in ('session_id', 'specific_trial_names'): + if field not in kwargs: + return { + 'statusCode': 400, + 'headers': {'Content-Type': 'application/json'}, + 'body': {'error': f'{field} field is required.'} + } + + # %% User inputs. + # Specify session id; see end of url in app.opencap.ai/session/. + # session_id = "8e430ad2-989c-4354-a6f1-7eb21fa0a16e" + session_id = kwargs['session_id'] + + # Specify trial names in a list; use None to process all trials in a session. + # specific_trial_names = ['walk'] + specific_trial_names = kwargs['specific_trial_names'] + + # Specify where to download the data. + sessionDir = os.path.join("/tmp/Data", session_id) + + # %% Download data. + trial_id = get_trial_id(session_id,specific_trial_names[0]) + trial_name = download_trial(trial_id,sessionDir,session_id=session_id) + + # Select how many gait cycles you'd like to analyze. Select -1 for all gait + # cycles detected in the trial. + n_gait_cycles = 1 + + # Select lowpass filter frequency for kinematics data. + filter_frequency = 6 + + # Select scalar names to compute. + scalar_names = { + 'gait_speed','stride_length','step_width','cadence', + 'double_support_time','step_length_symmetry'} + # 'single_support_time', + + scalar_labels = { + 'gait_speed': "Gait speed (m/s)", + 'stride_length':'Stride length (m)', + 'step_width': 'Step width (cm)', + 'cadence': 'Cadence (steps/min)', + # 'single_support_time': 'Single support time (% gait cycle)', + 'double_support_time': 'Double support (% gait cycle)', + 'step_length_symmetry': 'Step length symmetry (%, R/L)'} + + # %% Process data. + # Init gait analysis and get gait events. + legs = ['r'] + gait, gait_events = {}, {} + for leg in legs: + gait[leg] = gait_analysis( + sessionDir, trial_name, leg=leg, + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_gait_cycles=n_gait_cycles, gait_style='overground', + trimming_start=0, trimming_end=0.5) + gait_events[leg] = gait[leg].get_gait_events() + + # Select last leg. + last_leg = 'r' + + # Compute scalars. + gait_scalars = gait[last_leg].compute_scalars(scalar_names) + + gait_scalars['gait_speed']['decimal'] = 2 + gait_scalars['step_width']['decimal'] = 1 + gait_scalars['stride_length']['decimal'] = 2 + gait_scalars['cadence']['decimal'] = 1 + gait_scalars['double_support_time']['decimal'] = 1 + gait_scalars['step_length_symmetry']['decimal'] = 1 + + # Change units + # Default = 1 + for key in gait_scalars: + gait_scalars[key]['multiplier'] = 1 + + gait_scalars['step_width']['multiplier'] = 100 # cm + + # %% Thresholds. + metadataPath = os.path.join(sessionDir, 'sessionMetadata.yaml') + metadata = import_metadata(metadataPath) + subject_height = metadata['height_m'] + gait_speed_threshold = 67/60 + step_width_threshold = [4.3*subject_height, 7.4*subject_height] + stride_length_threshold = subject_height * .45 + cadence_threshold = 100 + # single_support_time_threshold = 65 + double_support_time_threshold = 35 + step_length_symmetry_threshold = [90,110] + thresholds = { + 'gait_speed': {'value': gait_speed_threshold, 'decimal': 2}, + 'step_width': {'value': step_width_threshold, 'decimal': 1}, + 'stride_length': {'value': stride_length_threshold, 'decimal': 2}, + 'cadence': {'value': cadence_threshold, 'decimal': 1}, + # 'single_support_time': single_support_time_threshold, + 'double_support_time': {'value': double_support_time_threshold, 'decimal': 1}, + 'step_length_symmetry': {'value': step_length_symmetry_threshold, 'decimal': 1}} + # Whether below-threshold values should be colored in red (default) or green (reverse). + scalar_reverse_colors = ['double_support_time'] + # Whether should be red-green-red plot + scalar_centered = ['step_length_symmetry','step_width'] + + # %% Return indices for visualizer and line curve plot. + # %% Create json for deployement. + # Indices / Times + indices = {} + indices['start'] = int(gait_events[last_leg]['ipsilateralIdx'][0,0]) + indices['end'] = int(gait_events[last_leg]['ipsilateralIdx'][0,-1]) + times = {} + times['start'] = float(gait_events[last_leg]['ipsilateralTime'][0,0]) + times['end'] = float(gait_events[last_leg]['ipsilateralTime'][0,-1]) + + # Metrics + metrics_out = {} + for scalar_name in scalar_names: + metrics_out[scalar_name] = {} + vertical_values = np.round(gait_scalars[scalar_name]['value'] * + gait_scalars[scalar_name]['multiplier'], + gait_scalars[scalar_name]['decimal']) + metrics_out[scalar_name]['label'] = scalar_labels[scalar_name] + metrics_out[scalar_name]['value'] = vertical_values + if scalar_name in scalar_reverse_colors: + # Margin zone (orange) is 10% above threshold. + metrics_out[scalar_name]['colors'] = ["green", "yellow", "red"] + metrics_out[scalar_name]['min_limit'] = float(np.round(thresholds[scalar_name]['value'],thresholds[scalar_name]['decimal'])) + metrics_out[scalar_name]['max_limit'] = float(np.round(1.10*thresholds[scalar_name]['value'],thresholds[scalar_name]['decimal'])) + elif scalar_name in scalar_centered: + # Red, green, red + metrics_out[scalar_name]['colors'] = ["red", "green", "red"] + metrics_out[scalar_name]['min_limit'] = float(np.round(thresholds[scalar_name]['value'][0],thresholds[scalar_name]['decimal'])) + metrics_out[scalar_name]['max_limit'] = float(np.round(thresholds[scalar_name]['value'][1],thresholds[scalar_name]['decimal'])) + else: + # Margin zone (orange) is 10% below threshold. + metrics_out[scalar_name]['colors'] = ["red", "yellow", "green"] + metrics_out[scalar_name]['min_limit'] = float(np.round(0.90*thresholds[scalar_name]['value'],thresholds[scalar_name]['decimal'])) + metrics_out[scalar_name]['max_limit'] = float(np.round(thresholds[scalar_name]['value'],thresholds[scalar_name]['decimal'])) + + # Datasets + colNames = gait[last_leg].coordinateValues.columns + data = gait[last_leg].coordinateValues.to_numpy() + coordValues = data[indices['start']:indices['end']+1] + datasets = [] + for i in range(coordValues.shape[0]): + datasets.append({}) + for j in range(coordValues.shape[1]): + # Exclude knee_angle_r_beta and knee_angle_l_beta + if 'beta' in colNames[j] or 'mtp' in colNames[j]: + continue + datasets[i][colNames[j]] = coordValues[i,j] + + # Available options for line curve chart. + y_axes = list(colNames) + y_axes.remove('time') + y_axes.remove('knee_angle_r_beta') + y_axes.remove('knee_angle_l_beta') + y_axes.remove('mtp_angle_r') + y_axes.remove('mtp_angle_l') + + # Create results dictionnary. + results = { + 'indices': times, + 'metrics': metrics_out, + 'datasets': datasets, + 'x_axis': 'time', + 'y_axis': y_axes} + + return { + 'statusCode': 200, + 'headers': {'Content-Type': 'application/json'}, + 'body': results + } \ No newline at end of file diff --git a/treadmill_gait_analysis/function/utils.py b/treadmill_gait_analysis/function/utils.py new file mode 100644 index 0000000..a693e89 --- /dev/null +++ b/treadmill_gait_analysis/function/utils.py @@ -0,0 +1,747 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: utils.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +import os +import requests +import urllib.request +import shutil +import numpy as np +import pandas as pd +import yaml +import pickle +import glob +import zipfile +import platform +import opensim + +from utilsAPI import get_api_url +from utilsAuthentication import get_token +import matplotlib.pyplot as plt +from scipy.signal import gaussian + + +API_URL = get_api_url() +API_TOKEN = get_token() + +def download_file(url, file_name): + with urllib.request.urlopen(url) as response, open(file_name, 'wb') as out_file: + shutil.copyfileobj(response, out_file) + +def get_session_json(session_id): + resp = requests.get( + API_URL + "sessions/{}/".format(session_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + + if resp.status_code == 500: + raise Exception('No server response. Likely not a valid session id.') + + sessionJson = resp.json() + if 'trials' not in sessionJson.keys(): + raise Exception('This session is not in your username, nor is it public. You do not have access.') + + # Sort trials by time recorded. + def get_created_at(trial): + return trial['created_at'] + sessionJson['trials'].sort(key=get_created_at) + + return sessionJson + +# Returns a list of all sessions of the user. +def get_user_sessions(): + sessions = requests.get( + API_URL + "sessions/valid/", + headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + + return sessions + +# Returns a list of all sessions of the user. +# TODO: this also contains public sessions of other users. +def get_user_sessions_all(user_token=API_TOKEN): + sessions = requests.get( + API_URL + "sessions/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return sessions + +def get_trial_json(trial_id): + trialJson = requests.get( + API_URL + "trials/{}/".format(trial_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + + return trialJson + +def get_neutral_trial_id(session_id): + session = get_session_json(session_id) + neutral_ids = [t['id'] for t in session['trials'] if t['name']=='neutral'] + + if len(neutral_ids)>0: + neutralID = neutral_ids[-1] + elif session['meta']['neutral_trial']: + neutralID = session['meta']['neutral_trial']['id'] + else: + raise Exception('No neutral trial in session.') + + return neutralID + + +def get_calibration_trial_id(session_id): + session = get_session_json(session_id) + + calib_ids = [t['id'] for t in session['trials'] if t['name'] == 'calibration'] + + if len(calib_ids)>0: + calibID = calib_ids[-1] + elif session['meta']['sessionWithCalibration']: + calibID = get_calibration_trial_id(session['meta']['sessionWithCalibration']['id']) + else: + raise Exception('No calibration trial in session.') + + return calibID + +def get_camera_mapping(session_id, session_path): + calibration_id = get_calibration_trial_id(session_id) + trial = get_trial_json(calibration_id) + resultTags = [res['tag'] for res in trial['results']] + + mappingPath = os.path.join(session_path,'Videos','mappingCamDevice.pickle') + os.makedirs(os.path.join(session_path,'Videos'), exist_ok=True) + if not os.path.exists(mappingPath): + mappingURL = trial['results'][resultTags.index('camera_mapping')]['media'] + download_file(mappingURL, mappingPath) + + +def get_model_and_metadata(session_id, session_path): + neutral_id = get_neutral_trial_id(session_id) + trial = get_trial_json(neutral_id) + resultTags = [res['tag'] for res in trial['results']] + + # Metadata. + metadataPath = os.path.join(session_path,'sessionMetadata.yaml') + if not os.path.exists(metadataPath) : + metadataURL = trial['results'][resultTags.index('session_metadata')]['media'] + download_file(metadataURL, metadataPath) + + # Model. + modelURL = trial['results'][resultTags.index('opensim_model')]['media'] + modelName = modelURL[modelURL.rfind('-')+1:modelURL.rfind('?')] + modelFolder = os.path.join(session_path, 'OpenSimData', 'Model') + modelPath = os.path.join(modelFolder, modelName) + if not os.path.exists(modelPath): + os.makedirs(modelFolder, exist_ok=True) + download_file(modelURL, modelPath) + + return modelName + +def get_main_settings(session_folder,trial_name): + settings_path = os.path.join(session_folder,'MarkerData', + 'Settings','settings_' + trial_name + '.yaml') + main_settings = import_metadata(settings_path) + + return main_settings + + +def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): + metadataPath = os.path.join(sessionFolder,'sessionMetadata.yaml') + + if os.path.exists(metadataPath): + metadata = import_metadata(os.path.join(sessionFolder,'sessionMetadata.yaml')) + modelName = metadata['openSimModel'] + appendText + '.osim' + else: + raise Exception('Session metadata not found, could not identify OpenSim model.') + + return modelName + + +def get_motion_data(trial_id, session_path): + trial = get_trial_json(trial_id) + trial_name = trial['name'] + resultTags = [res['tag'] for res in trial['results']] + + # Marker data. + if 'ik_results' in resultTags: + markerFolder = os.path.join(session_path, 'MarkerData') + markerPath = os.path.join(markerFolder, trial_name + '.trc') + os.makedirs(markerFolder, exist_ok=True) + markerURL = trial['results'][resultTags.index('marker_data')]['media'] + download_file(markerURL, markerPath) + + # IK data. + if 'ik_results' in resultTags: + ikFolder = os.path.join(session_path, 'OpenSimData', 'Kinematics') + ikPath = os.path.join(ikFolder, trial_name + '.mot') + os.makedirs(ikFolder, exist_ok=True) + ikURL = trial['results'][resultTags.index('ik_results')]['media'] + download_file(ikURL, ikPath) + + # Main settings + if 'main_settings' in resultTags: + settingsFolder = os.path.join(session_path, 'MarkerData', 'Settings') + settingsPath = os.path.join(settingsFolder, 'settings_' + trial_name + '.yaml') + os.makedirs(settingsFolder, exist_ok=True) + settingsURL = trial['results'][resultTags.index('main_settings')]['media'] + download_file(settingsURL, settingsPath) + + +def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'): + + geometryFolder = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') + try: + # Download. + os.makedirs(geometryFolder, exist_ok=True) + if 'Lai' in modelName: + modelType = 'LaiArnold' + vtpNames = [ + 'capitate_lvs','capitate_rvs','hamate_lvs','hamate_rvs', + 'hat_jaw','hat_ribs_scap','hat_skull','hat_spine','humerus_lv', + 'humerus_rv','index_distal_lvs','index_distal_rvs', + 'index_medial_lvs', 'index_medial_rvs','index_proximal_lvs', + 'index_proximal_rvs','little_distal_lvs','little_distal_rvs', + 'little_medial_lvs','little_medial_rvs','little_proximal_lvs', + 'little_proximal_rvs','lunate_lvs','lunate_rvs','l_bofoot', + 'l_femur','l_fibula','l_foot','l_patella','l_pelvis','l_talus', + 'l_tibia','metacarpal1_lvs','metacarpal1_rvs', + 'metacarpal2_lvs','metacarpal2_rvs','metacarpal3_lvs', + 'metacarpal3_rvs','metacarpal4_lvs','metacarpal4_rvs', + 'metacarpal5_lvs','metacarpal5_rvs','middle_distal_lvs', + 'middle_distal_rvs','middle_medial_lvs','middle_medial_rvs', + 'middle_proximal_lvs','middle_proximal_rvs','pisiform_lvs', + 'pisiform_rvs','radius_lv','radius_rv','ring_distal_lvs', + 'ring_distal_rvs','ring_medial_lvs','ring_medial_rvs', + 'ring_proximal_lvs','ring_proximal_rvs','r_bofoot','r_femur', + 'r_fibula','r_foot','r_patella','r_pelvis','r_talus','r_tibia', + 'sacrum','scaphoid_lvs','scaphoid_rvs','thumb_distal_lvs', + 'thumb_distal_rvs','thumb_proximal_lvs','thumb_proximal_rvs', + 'trapezium_lvs','trapezium_rvs','trapezoid_lvs','trapezoid_rvs', + 'triquetrum_lvs','triquetrum_rvs','ulna_lv','ulna_rv'] + else: + raise ValueError("Geometries not available for this model") + for vtpName in vtpNames: + url = 'https://mc-opencap-public.s3.us-west-2.amazonaws.com/geometries_vtp/{}/{}.vtp'.format(modelType, vtpName) + filename = os.path.join(geometryFolder, '{}.vtp'.format(vtpName)) + download_file(url, filename) + except: + pass + +def import_metadata(filePath): + myYamlFile = open(filePath) + parsedYamlFile = yaml.load(myYamlFile, Loader=yaml.FullLoader) + + return parsedYamlFile + +def download_kinematics(session_id, folder=None, trialNames=None): + + # Login to access opencap data from server. + + # Create folder. + if folder is None: + folder = os.getcwd() + os.makedirs(folder, exist_ok=True) + + # Model and metadata. + neutral_id = get_neutral_trial_id(session_id) + get_motion_data(neutral_id, folder) + modelName = get_model_and_metadata(session_id, folder) + # Remove extension from modelName + modelName = modelName.replace('.osim','') + + # Session trial names. + sessionJson = get_session_json(session_id) + sessionTrialNames = [t['name'] for t in sessionJson['trials']] + if trialNames != None: + [print(t + ' not in session trial names.') + for t in trialNames if t not in sessionTrialNames] + + # Motion data. + loadedTrialNames = [] + for trialDict in sessionJson['trials']: + if trialNames is not None and trialDict['name'] not in trialNames: + continue + trial_id = trialDict['id'] + get_motion_data(trial_id,folder) + loadedTrialNames.append(trialDict['name']) + + # Remove 'calibration' and 'neutral' from loadedTrialNames. + loadedTrialNames = [i for i in loadedTrialNames if i!='neutral' and i!='calibration'] + + # Geometries. + get_geometries(folder, modelName=modelName) + + return loadedTrialNames, modelName + +# Download pertinent trial data. +def download_trial(trial_id, folder, session_id=None): + + trial = get_trial_json(trial_id) + if session_id is None: + session_id = trial['session_id'] + + os.makedirs(folder,exist_ok=True) + + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + get_motion_data(trial_id,folder) + + return trial['name'] + + +# Get trial ID from name. +def get_trial_id(session_id,trial_name): + session = get_session_json(session_id) + + trial_id = [t['id'] for t in session['trials'] if t['name'] == trial_name] + + return trial_id[0] + +# %% Storage file to numpy array. +def storage_to_numpy(storage_file, excess_header_entries=0): + """Returns the data from a storage file in a numpy format. Skips all lines + up to and including the line that says 'endheader'. + Parameters + ---------- + storage_file : str + Path to an OpenSim Storage (.sto) file. + Returns + ------- + data : np.ndarray (or numpy structure array or something?) + Contains all columns from the storage file, indexable by column name. + excess_header_entries : int, optional + If the header row has more names in it than there are data columns. + We'll ignore this many header row entries from the end of the header + row. This argument allows for a hacky fix to an issue that arises from + Static Optimization '.sto' outputs. + Examples + -------- + Columns from the storage file can be obtained as follows: + >>> data = storage2numpy('') + >>> data['ground_force_vy'] + """ + # What's the line number of the line containing 'endheader'? + f = open(storage_file, 'r') + + header_line = False + for i, line in enumerate(f): + if header_line: + column_names = line.split() + break + if line.count('endheader') != 0: + line_number_of_line_containing_endheader = i + 1 + header_line = True + f.close() + + # With this information, go get the data. + if excess_header_entries == 0: + names = True + skip_header = line_number_of_line_containing_endheader + else: + names = column_names[:-excess_header_entries] + skip_header = line_number_of_line_containing_endheader + 1 + data = np.genfromtxt(storage_file, names=names, + skip_header=skip_header) + + return data + +# %% Storage file to dataframe. +def storage_to_dataframe(storage_file, headers): + # Extract data + data = storage_to_numpy(storage_file) + out = pd.DataFrame(data=data['time'], columns=['time']) + for count, header in enumerate(headers): + out.insert(count + 1, header, data[header]) + + return out + +# %% Load storage and output as dataframe or numpy +def load_storage(file_path,outputFormat='numpy'): + table = opensim.TimeSeriesTable(file_path) + data = table.getMatrix().to_numpy() + time = np.asarray(table.getIndependentColumn()).reshape(-1, 1) + data = np.hstack((time,data)) + headers = ['time'] + list(table.getColumnLabels()) + + if outputFormat == 'numpy': + return data,headers + elif outputFormat == 'dataframe': + return pd.DataFrame(data, columns=headers) + else: + return None + +# %% Numpy array to storage file. +def numpy_to_storage(labels, data, storage_file, datatype=None): + + assert data.shape[1] == len(labels), "# labels doesn't match columns" + assert labels[0] == "time" + + f = open(storage_file, 'w') + # Old style + if datatype is None: + f = open(storage_file, 'w') + f.write('name %s\n' %storage_file) + f.write('datacolumns %d\n' %data.shape[1]) + f.write('datarows %d\n' %data.shape[0]) + f.write('range %f %f\n' %(np.min(data[:, 0]), np.max(data[:, 0]))) + f.write('endheader \n') + # New style + else: + if datatype == 'IK': + f.write('Coordinates\n') + elif datatype == 'ID': + f.write('Inverse Dynamics Generalized Forces\n') + elif datatype == 'GRF': + f.write('%s\n' %storage_file) + elif datatype == 'muscle_forces': + f.write('ModelForces\n') + f.write('version=1\n') + f.write('nRows=%d\n' %data.shape[0]) + f.write('nColumns=%d\n' %data.shape[1]) + if datatype == 'IK': + f.write('inDegrees=yes\n\n') + f.write('Units are S.I. units (second, meters, Newtons, ...)\n') + f.write("If the header above contains a line with 'inDegrees', this indicates whether rotational values are in degrees (yes) or radians (no).\n\n") + elif datatype == 'ID': + f.write('inDegrees=no\n') + elif datatype == 'GRF': + f.write('inDegrees=yes\n') + elif datatype == 'muscle_forces': + f.write('inDegrees=yes\n\n') + f.write('This file contains the forces exerted on a model during a simulation.\n\n') + f.write("A force is a generalized force, meaning that it can be either a force (N) or a torque (Nm).\n\n") + f.write('Units are S.I. units (second, meters, Newtons, ...)\n') + f.write('Angles are in degrees.\n\n') + + f.write('endheader \n') + + for i in range(len(labels)): + f.write('%s\t' %labels[i]) + f.write('\n') + + for i in range(data.shape[0]): + for j in range(data.shape[1]): + f.write('%20.8f\t' %data[i, j]) + f.write('\n') + + f.close() + +def download_videos_from_server(session_id,trial_id, + isCalibration=False, isStaticPose=False, + trial_name= None, session_path = None): + + if session_path is None: + data_dir = os.getcwd() + session_path = os.path.join(data_dir,'Data', session_id) + if not os.path.exists(session_path): + os.makedirs(session_path, exist_ok=True) + + resp = requests.get("{}trials/{}/".format(API_URL,trial_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + trial = resp.json() + if trial_name is None: + trial_name = trial['name'] + trial_name = trial_name.replace(' ', '') + + print("\nDownloading {}".format(trial_name)) + + # The videos are not always organized in the same order. Here, we save + # the order during the first trial processed in the session such that we + # can use the same order for the other trials. + if not os.path.exists(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle')): + mappingCamDevice = {} + for k, video in enumerate(trial["videos"]): + os.makedirs(os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name), exist_ok=True) + video_path = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name, trial_name + ".mov") + download_file(video["video"], video_path) + mappingCamDevice[video["device_id"].replace('-', '').upper()] = k + with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'wb') as handle: + pickle.dump(mappingCamDevice, handle) + else: + with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'rb') as handle: + mappingCamDevice = pickle.load(handle) + # ensure upper on deviceID + for dID in mappingCamDevice.keys(): + mappingCamDevice[dID.upper()] = mappingCamDevice.pop(dID) + for video in trial["videos"]: + k = mappingCamDevice[video["device_id"].replace('-', '').upper()] + videoDir = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name) + os.makedirs(videoDir, exist_ok=True) + video_path = os.path.join(videoDir, trial_name + ".mov") + if not os.path.exists(video_path): + if video['video'] : + download_file(video["video"], video_path) + + return trial_name + + +def get_calibration(session_id,session_path): + calibration_id = get_calibration_trial_id(session_id) + + resp = requests.get("{}trials/{}/".format(API_URL,calibration_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + trial = resp.json() + calibResultTags = [res['tag'] for res in trial['results']] + + videoFolder = os.path.join(session_path,'Videos') + os.makedirs(videoFolder, exist_ok=True) + + if trial['status'] != 'done': + return + + mapURL = trial['results'][calibResultTags.index('camera_mapping')]['media'] + mapLocalPath = os.path.join(videoFolder,'mappingCamDevice.pickle') + + download_and_switch_calibration(session_id,session_path,calibTrialID=calibration_id) + + # Download mapping + if len(glob.glob(mapLocalPath)) == 0: + download_file(mapURL,mapLocalPath) + + +def download_and_switch_calibration(session_id,session_path,calibTrialID = None): + if calibTrialID == None: + calibTrialID = get_calibration_trial_id(session_id) + resp = requests.get("https://api.opencap.ai/trials/{}/".format(calibTrialID), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + trial = resp.json() + + calibURLs = {t['device_id']:t['media'] for t in trial['results'] if t['tag'] == 'calibration_parameters_options'} + calibImgURLs = {t['device_id']:t['media'] for t in trial['results'] if t['tag'] == 'calibration-img'} + _,imgExtension = os.path.splitext(calibImgURLs[list(calibImgURLs.keys())[0]]) + lastIdx = imgExtension.find('?') + if lastIdx >0: + imgExtension = imgExtension[:lastIdx] + + if 'meta' in trial.keys() and trial['meta'] is not None and 'calibration' in trial['meta'].keys(): + calibDict = trial['meta']['calibration'] + calibImgFolder = os.path.join(session_path,'CalibrationImages') + os.makedirs(calibImgFolder,exist_ok=True) + for cam,calibNum in calibDict.items(): + camDir = os.path.join(session_path,'Videos',cam) + os.makedirs(camDir,exist_ok=True) + file_name = os.path.join(camDir,'cameraIntrinsicsExtrinsics.pickle') + img_fileName = os.path.join(calibImgFolder,'calib_img' + cam + imgExtension) + if calibNum == 0: + download_file(calibURLs[cam+'_soln0'], file_name) + download_file(calibImgURLs[cam],img_fileName) + elif calibNum == 1: + download_file(calibURLs[cam+'_soln1'], file_name) + download_file(calibImgURLs[cam + '_altSoln'],img_fileName) + + +def post_file_to_trial(filePath,trial_id,tag,device_id): + files = {'media': open(filePath, 'rb')} + data = { + "trial": trial_id, + "tag": tag, + "device_id" : device_id + } + + requests.post("{}results/".format(API_URL), files=files, data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + files["media"].close() + + +def get_syncd_videos(trial_id,session_path): + trial = requests.get("{}trials/{}/".format(API_URL,trial_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}).json() + trial_name = trial['name'] + + if trial['results']: + for result in trial['results']: + if result['tag'] == 'video-sync': + url = result['media'] + cam,suff = os.path.splitext(url[url.rfind('_')+1:]) + lastIdx = suff.find('?') + if lastIdx >0: + suff = suff[:lastIdx] + + syncVideoPath = os.path.join(session_path,'Videos',cam,'InputMedia',trial_name,trial_name + '_sync' + suff) + download_file(url,syncVideoPath) + + +def download_session(session_id, sessionBasePath= None, + zipFolder=False,writeToDB=False, downloadVideos=True): + print('\nDownloading {}'.format(session_id)) + + if sessionBasePath is None: + sessionBasePath = os.path.join(os.getcwd(),'Data') + + session = get_session_json(session_id) + session_path = os.path.join(sessionBasePath,'OpenCapData_' + session_id) + + calib_id = get_calibration_trial_id(session_id) + neutral_id = get_neutral_trial_id(session_id) + dynamic_ids = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] + + # Calibration + try: + get_camera_mapping(session_id, session_path) + if downloadVideos: + download_videos_from_server(session_id,calib_id, + isCalibration=True,isStaticPose=False, + session_path = session_path) + + get_calibration(session_id,session_path) + except: + pass + + # Neutral + try: + modelName = get_model_and_metadata(session_id,session_path) + get_motion_data(neutral_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,neutral_id, + isCalibration=False,isStaticPose=True, + session_path = session_path) + + get_syncd_videos(neutral_id,session_path) + except: + pass + + # Dynamic + for dynamic_id in dynamic_ids: + try: + get_motion_data(dynamic_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,dynamic_id, + isCalibration=False,isStaticPose=False, + session_path = session_path) + + get_syncd_videos(dynamic_id,session_path) + except: + pass + + repoDir = os.path.dirname(os.path.abspath(__file__)) + + # Readme + try: + pathReadme = os.path.join(repoDir, 'Resources', 'README.txt') + pathReadmeEnd = os.path.join(session_path, 'README.txt') + shutil.copy2(pathReadme, pathReadmeEnd) + except: + pass + + # Geometry + try: + if 'Lai' in modelName: + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + if platform.system() == 'Windows': + geometryDir = os.path.join(repoDir, 'tmp', modelType, 'Geometry') + else: + geometryDir = "/tmp/{}/Geometry".format(modelType) + # If not in cache, download from s3. + if not os.path.exists(geometryDir): + os.makedirs(geometryDir, exist_ok=True) + get_geometries(session_path, modelName=modelName) + geometryDirEnd = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') + shutil.copytree(geometryDir, geometryDirEnd) + except: + pass + + # Zip + def zipdir(path, ziph): + # ziph is zipfile handle + for root, dirs, files in os.walk(path): + for file in files: + ziph.write(os.path.join(root, file), + os.path.relpath(os.path.join(root, file), + os.path.join(path, '..'))) + session_zip = '{}.zip'.format(session_path) + if os.path.isfile(session_zip): + os.remove(session_zip) + if zipFolder: + zipf = zipfile.ZipFile(session_zip, 'w', zipfile.ZIP_DEFLATED) + zipdir(session_path, zipf) + zipf.close() + + # Write zip as a result to last trial for now + if writeToDB: + post_file_to_trial(session_zip,dynamic_ids[-1],tag='session_zip', + device_id='all') + +def cross_corr(y1, y2,multCorrGaussianStd=None,visualize=False): + """Calculates the cross correlation and lags without normalization. + + The definition of the discrete cross-correlation is in: + https://www.mathworks.com/help/matlab/ref/xcorr.html + + Args: + y1, y2: Should have the same length. + + Returns: + max_corr: Maximum correlation without normalization. + lag: The lag in terms of the index. + """ + # Pad shorter signal with 0s + if len(y1) > len(y2): + temp = np.zeros(len(y1)) + temp[0:len(y2)] = y2 + y2 = np.copy(temp) + elif len(y2)>len(y1): + temp = np.zeros(len(y2)) + temp[0:len(y1)] = y1 + y1 = np.copy(temp) + + y1_auto_corr = np.dot(y1, y1) / len(y1) + y2_auto_corr = np.dot(y2, y2) / len(y1) + corr = np.correlate(y1, y2, mode='same') + # The unbiased sample size is N - lag. + unbiased_sample_size = np.correlate(np.ones(len(y1)), np.ones(len(y1)), mode='same') + corr = corr / unbiased_sample_size / np.sqrt(y1_auto_corr * y2_auto_corr) + shift = len(y1) // 2 + max_corr = np.max(corr) + argmax_corr = np.argmax(corr) + + if visualize: + plt.figure() + plt.plot(corr) + plt.title('vertical velocity correlation') + + # Multiply correlation curve by gaussian (prioritizing lag solution closest to 0) + if multCorrGaussianStd is not None: + corr = np.multiply(corr,gaussian(len(corr),multCorrGaussianStd)) + if visualize: + plt.plot(corr,color=[.4,.4,.4]) + plt.legend(['corr','corr*gaussian']) + + argmax_corr = np.argmax(corr) + max_corr = np.nanmax(corr) + + lag = argmax_corr-shift + + return max_corr, lag + +def downsample(data,time,framerate_in,framerate_out): + # Calculate the downsampling factor + downsampling_factor = framerate_in / framerate_out + + # Create new indices for downsampling + original_indices = np.arange(len(data)) + new_indices = np.arange(0, len(data), downsampling_factor) + + # Perform downsampling with interpolation + downsampled_data = np.ndarray((len(new_indices), data.shape[1])) + for i in range(data.shape[1]): + downsampled_data[:,i] = np.interp(new_indices, original_indices, data[:,i]) + + downsampled_time = np.interp(new_indices, original_indices, time) + + return downsampled_time, downsampled_data \ No newline at end of file diff --git a/treadmill_gait_analysis/function/utilsAPI.py b/treadmill_gait_analysis/function/utilsAPI.py new file mode 100644 index 0000000..645f0e6 --- /dev/null +++ b/treadmill_gait_analysis/function/utilsAPI.py @@ -0,0 +1,33 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: utilsAPI.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +from decouple import config + +def get_api_url(): + if 'API_URL' not in globals(): + global API_URL + try: # look in environment file + API_URL = config("API_URL") + except: # default + API_URL = "https://api.opencap.ai/" + if API_URL[-1] != '/': + API_URL= API_URL + '/' + + return API_URL diff --git a/treadmill_gait_analysis/function/utilsAuthentication.py b/treadmill_gait_analysis/function/utilsAuthentication.py new file mode 100644 index 0000000..049bdc6 --- /dev/null +++ b/treadmill_gait_analysis/function/utilsAuthentication.py @@ -0,0 +1,77 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: utilsAuthentication.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +import requests +from decouple import config +import getpass +import os +import maskpass +from utilsAPI import get_api_url + +API_URL = get_api_url() + +def get_token(saveEnvPath=None): + + if 'API_TOKEN' not in globals(): + + try: # look in environment file + token = config("API_TOKEN") + + except: + try: + # If spyder, use maskpass + isSpyder = 'SPY_PYTHONPATH' in os.environ + isPycharm = 'PYCHARM_HOSTED' in os.environ + print('Login with credentials used at app.opencap.ai.\nVisit the website to make an account if you do not have one.\n') + + if isSpyder: + un = maskpass.advpass(prompt="Enter Username:\n",ide=True) + pw = maskpass.advpass(prompt="Enter Password:\n",ide=True) + elif isPycharm: + print('Warning, you are in Pycharm, so the password will show up in the console.\n To avoid this, run createAuthenticationEnvFile.py from the terminal,\nthen re-open PyCharm.') + un = input("Enter Username:") + pw = input("Enter Password (will be shown in console):") + else: + un = getpass.getpass(prompt='Enter Username: ', stream=None) + pw = getpass.getpass(prompt='Enter Password: ', stream=None) + + data = {"username":un,"password":pw} + resp = requests.post(API_URL + 'login/',data=data).json() + token = resp['token'] + + print('Login successful.') + + if saveEnvPath is not None: + envPath = os.path.join(saveEnvPath,'.env') + + f = open(envPath, "w") + f.write('API_TOKEN="' + token + '"') + f.close() + print('Authentication token saved to '+ envPath + '. DO NOT CHANGE THIS FILE NAME. If you do, your authentication token will get pushed to github. Restart your terminal for env file to load.') + + except: + raise Exception('Login failed.') + + global API_TOKEN + API_TOKEN = token + else: + token = API_TOKEN + + return token diff --git a/treadmill_gait_analysis/function/utilsKinematics.py b/treadmill_gait_analysis/function/utilsKinematics.py new file mode 100644 index 0000000..796e0d8 --- /dev/null +++ b/treadmill_gait_analysis/function/utilsKinematics.py @@ -0,0 +1,409 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: utilsKinematics.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +import os +import opensim +import utils +import numpy as np +import pandas as pd +import scipy.interpolate as interpolate + + +from utilsProcessing import lowPassFilter +from utilsTRC import trc_2_dict + + +class kinematics: + + def __init__(self, sessionDir, trialName, + modelName=None, + lowpass_cutoff_frequency_for_coordinate_values=-1): + + self.lowpass_cutoff_frequency_for_coordinate_values = ( + lowpass_cutoff_frequency_for_coordinate_values) + + # Model. + opensim.Logger.setLevelString('error') + + modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + # Load model if specified, otherwise load the one that was on server + if modelName is None: + modelName = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath,modelName) + else: + modelPath = os.path.join(modelBasePath, + '{}.osim'.format(modelName)) + + # make sure model exists + if not os.path.exists(modelPath): + raise Exception('Model path: ' + modelPath + ' does not exist.') + + self.model = opensim.Model(modelPath) + self.model.initSystem() + + # Motion file with coordinate values. + motionPath = os.path.join(sessionDir, 'OpenSimData', 'Kinematics', + '{}.mot'.format(trialName)) + + # Create time-series table with coordinate values. + self.table = opensim.TimeSeriesTable(motionPath) + tableProcessor = opensim.TableProcessor(self.table) + self.columnLabels = list(self.table.getColumnLabels()) + tableProcessor.append(opensim.TabOpUseAbsoluteStateNames()) + self.time = np.asarray(self.table.getIndependentColumn()) + + # Initialize the state trajectory. We will set it in other functions + # if it is needed. + self._stateTrajectory = None + + # Filter coordinate values. + if lowpass_cutoff_frequency_for_coordinate_values > 0: + tableProcessor.append( + opensim.TabOpLowPassFilter( + lowpass_cutoff_frequency_for_coordinate_values)) + + # Convert in radians. + self.table = tableProcessor.processAndConvertToRadians(self.model) + + # Trim if filtered. + if lowpass_cutoff_frequency_for_coordinate_values > 0: + time_temp = self.table.getIndependentColumn() + self.table.trim( + time_temp[self.table.getNearestRowIndexForTime(self.time[0])], + time_temp[self.table.getNearestRowIndexForTime(self.time[-1])]) + + # Compute coordinate speeds and accelerations and add speeds to table. + self.Qs = self.table.getMatrix().to_numpy() + self.Qds = np.zeros(self.Qs.shape) + self.Qdds = np.zeros(self.Qs.shape) + columnAbsoluteLabels = list(self.table.getColumnLabels()) + for i, columnLabel in enumerate(columnAbsoluteLabels): + spline = interpolate.InterpolatedUnivariateSpline( + self.time, self.Qs[:,i], k=3) + # Coordinate speeds + splineD1 = spline.derivative(n=1) + self.Qds[:,i] = splineD1(self.time) + # Coordinate accelerations. + splineD2 = spline.derivative(n=2) + self.Qdds[:,i] = splineD2(self.time) + # Add coordinate speeds to table. + columnLabel_speed = columnLabel[:-5] + 'speed' + self.table.appendColumn( + columnLabel_speed, + opensim.Vector(self.Qds[:,i].flatten().tolist())) + + # Append missing muscle states to table. + # Needed for StatesTrajectory. + stateVariableNames = self.model.getStateVariableNames() + stateVariableNamesStr = [ + stateVariableNames.get(i) for i in range( + stateVariableNames.getSize())] + existingLabels = self.table.getColumnLabels() + for stateVariableNameStr in stateVariableNamesStr: + if not stateVariableNameStr in existingLabels: + vec_0 = opensim.Vector([0] * self.table.getNumRows()) + self.table.appendColumn(stateVariableNameStr, vec_0) + + # Number of muscles. + self.nMuscles = 0 + self.forceSet = self.model.getForceSet() + for i in range(self.forceSet.getSize()): + c_force_elt = self.forceSet.get(i) + if 'Muscle' in c_force_elt.getConcreteClassName(): + self.nMuscles += 1 + + # Coordinates. + self.coordinateSet = self.model.getCoordinateSet() + self.nCoordinates = self.coordinateSet.getSize() + self.coordinates = [self.coordinateSet.get(i).getName() + for i in range(self.nCoordinates)] + + # Find rotational and translational coordinates. + self.idxColumnTrLabels = [ + self.columnLabels.index(i) for i in self.coordinates if \ + self.coordinateSet.get(i).getMotionType() == 2] + self.idxColumnRotLabels = [ + self.columnLabels.index(i) for i in self.coordinates if \ + self.coordinateSet.get(i).getMotionType() == 1] + + # TODO: hard coded + self.rootCoordinates = [ + 'pelvis_tilt', 'pelvis_list', 'pelvis_rotation', + 'pelvis_tx', 'pelvis_ty', 'pelvis_tz'] + + self.lumbarCoordinates = ['lumbar_extension', 'lumbar_bending', + 'lumbar_rotation'] + + self.armCoordinates = ['arm_flex_r', 'arm_add_r', 'arm_rot_r', + 'elbow_flex_r', 'pro_sup_r', + 'arm_flex_l', 'arm_add_l', 'arm_rot_l', + 'elbow_flex_l', 'pro_sup_l'] + + # Only set the state trajectory when needed because it is slow. + def stateTrajectory(self): + if self._stateTrajectory is None: + self._stateTrajectory = ( + opensim.StatesTrajectory.createFromStatesTable( + self.model, self.table)) + return self._stateTrajectory + + def get_marker_dict(self, session_dir, trial_name, + lowpass_cutoff_frequency=-1): + + trcFilePath = os.path.join(session_dir, + 'MarkerData', + '{}.trc'.format(trial_name)) + + markerDict = trc_2_dict(trcFilePath) + if lowpass_cutoff_frequency > 0: + markerDict['markers'] = { + marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) + for marker_name, data in markerDict['markers'].items()} + + return markerDict + + def get_coordinate_values(self, in_degrees=True, + lowpass_cutoff_frequency=-1): + + # Convert to degrees. + if in_degrees: + Qs = np.zeros((self.Qs.shape)) + Qs[:, self.idxColumnTrLabels] = self.Qs[:, self.idxColumnTrLabels] + Qs[:, self.idxColumnRotLabels] = ( + self.Qs[:, self.idxColumnRotLabels] * 180 / np.pi) + else: + Qs = self.Qs + + # Filter. + if lowpass_cutoff_frequency > 0: + Qs = lowPassFilter(self.time, Qs, lowpass_cutoff_frequency) + if self.lowpass_cutoff_frequency_for_coordinate_values > 0: + print("Warning: You are filtering the coordinate values a second time; coordinate values were filtered when creating your class object.") + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), Qs), axis=1) + columns = ['time'] + self.columnLabels + coordinate_values = pd.DataFrame(data=data, columns=columns) + + return coordinate_values + + def get_coordinate_speeds(self, in_degrees=True, + lowpass_cutoff_frequency=-1): + + # Convert to degrees. + if in_degrees: + Qds = np.zeros((self.Qds.shape)) + Qds[:, self.idxColumnTrLabels] = ( + self.Qds[:, self.idxColumnTrLabels]) + Qds[:, self.idxColumnRotLabels] = ( + self.Qds[:, self.idxColumnRotLabels] * 180 / np.pi) + else: + Qds = self.Qds + + # Filter. + if lowpass_cutoff_frequency > 0: + Qds = lowPassFilter(self.time, Qds, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), Qds), axis=1) + columns = ['time'] + self.columnLabels + coordinate_speeds = pd.DataFrame(data=data, columns=columns) + + return coordinate_speeds + + def get_coordinate_accelerations(self, in_degrees=True, + lowpass_cutoff_frequency=-1): + + # Convert to degrees. + if in_degrees: + Qdds = np.zeros((self.Qdds.shape)) + Qdds[:, self.idxColumnTrLabels] = ( + self.Qdds[:, self.idxColumnTrLabels]) + Qdds[:, self.idxColumnRotLabels] = ( + self.Qdds[:, self.idxColumnRotLabels] * 180 / np.pi) + else: + Qdds = self.Qdds + + # Filter. + if lowpass_cutoff_frequency > 0: + Qdds = lowPassFilter(self.time, Qdds, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), Qdds), axis=1) + columns = ['time'] + self.columnLabels + coordinate_accelerations = pd.DataFrame(data=data, columns=columns) + + return coordinate_accelerations + + def get_muscle_tendon_lengths(self, lowpass_cutoff_frequency=-1): + + # Compute muscle-tendon lengths. + lMT = np.zeros((self.table.getNumRows(), self.nMuscles)) + for i in range(self.table.getNumRows()): + self.model.realizePosition(self.stateTrajectory()[i]) + if i == 0: + muscleNames = [] + for m in range(self.forceSet.getSize()): + c_force_elt = self.forceSet.get(m) + if 'Muscle' in c_force_elt.getConcreteClassName(): + cObj = opensim.Muscle.safeDownCast(c_force_elt) + lMT[i,m] = cObj.getLength(self.stateTrajectory()[i]) + if i == 0: + muscleNames.append(c_force_elt.getName()) + + # Filter. + if lowpass_cutoff_frequency > 0: + lMT = lowPassFilter(self.time, lMT, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), lMT), axis=1) + columns = ['time'] + muscleNames + muscle_tendon_lengths = pd.DataFrame(data=data, columns=columns) + + return muscle_tendon_lengths + + def get_moment_arms(self, lowpass_cutoff_frequency=-1): + + # Compute moment arms. + dM = np.zeros((self.table.getNumRows(), self.nMuscles, + self.nCoordinates)) + for i in range(self.table.getNumRows()): + self.model.realizePosition(self.stateTrajectory()[i]) + if i == 0: + muscleNames = [] + for m in range(self.forceSet.getSize()): + c_force_elt = self.forceSet.get(m) + if 'Muscle' in c_force_elt.getConcreteClassName(): + muscleName = c_force_elt.getName() + cObj = opensim.Muscle.safeDownCast(c_force_elt) + if i == 0: + muscleNames.append(c_force_elt.getName()) + for c, coord in enumerate(self.coordinates): + # We use prior knowledge to improve computation speed; + # We do not want to compute moment arms that are not + # relevant, eg for a muscle of the left side with + # respect to a coordinate of the right side. + if muscleName[-2:] == '_l' and coord[-2:] == '_r': + dM[i, m, c] = 0 + elif muscleName[-2:] == '_r' and coord[-2:] == '_l': + dM[i, m, c] = 0 + elif (coord in self.rootCoordinates or + coord in self.lumbarCoordinates or + coord in self.armCoordinates): + dM[i, m, c] = 0 + else: + coordinate = self.coordinateSet.get( + self.coordinates.index(coord)) + dM[i, m, c] = cObj.computeMomentArm( + self.stateTrajectory()[i], coordinate) + + # Clean numerical artefacts (ie, moment arms smaller than 1e-5 m). + dM[np.abs(dM) < 1e-5] = 0 + + # Filter. + if lowpass_cutoff_frequency > 0: + for c, coord in enumerate(self.coordinates): + dM[:, :, c] = lowPassFilter(self.time, dM[:, :, c], + lowpass_cutoff_frequency) + + # Return as DataFrame. + moment_arms = {} + for c, coord in enumerate(self.coordinates): + data = np.concatenate( + (np.expand_dims(self.time, axis=1), dM[:,:,c]), axis=1) + columns = ['time'] + muscleNames + moment_arms[coord] = pd.DataFrame(data=data, columns=columns) + + return moment_arms + + def compute_center_of_mass(self): + + # Compute center of mass position and velocity. + self.com_values = np.zeros((self.table.getNumRows(),3)) + self.com_speeds = np.zeros((self.table.getNumRows(),3)) + for i in range(self.table.getNumRows()): + self.model.realizeVelocity(self.stateTrajectory()[i]) + self.com_values[i,:] = self.model.calcMassCenterPosition( + self.stateTrajectory()[i]).to_numpy() + self.com_speeds[i,:] = self.model.calcMassCenterVelocity( + self.stateTrajectory()[i]).to_numpy() + + def get_center_of_mass_values(self, lowpass_cutoff_frequency=-1): + + self.compute_center_of_mass() + com_v = self.com_values + + # Filter. + if lowpass_cutoff_frequency > 0: + com_v = lowPassFilter(self.time, com_v, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), com_v), axis=1) + columns = ['time'] + ['x','y','z'] + com_values = pd.DataFrame(data=data, columns=columns) + + return com_values + + def get_center_of_mass_speeds(self, lowpass_cutoff_frequency=-1): + + self.compute_center_of_mass() + com_s = self.com_speeds + + # Filter. + if lowpass_cutoff_frequency > 0: + com_s = lowPassFilter(self.time, com_s, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), com_s), axis=1) + columns = ['time'] + ['x','y','z'] + com_speeds = pd.DataFrame(data=data, columns=columns) + + return com_speeds + + def get_center_of_mass_accelerations(self, lowpass_cutoff_frequency=-1): + + self.compute_center_of_mass() + com_s = self.com_speeds + + # Accelerations are first time derivative of speeds. + com_a = np.zeros((com_s.shape)) + for i in range(com_s.shape[1]): + spline = interpolate.InterpolatedUnivariateSpline( + self.time, com_s[:,i], k=3) + splineD1 = spline.derivative(n=1) + com_a[:,i] = splineD1(self.time) + + # Filter. + if lowpass_cutoff_frequency > 0: + com_a = lowPassFilter(self.time, com_a, lowpass_cutoff_frequency) + + # Return as DataFrame. + data = np.concatenate( + (np.expand_dims(self.time, axis=1), com_a), axis=1) + columns = ['time'] + ['x','y','z'] + com_accelerations = pd.DataFrame(data=data, columns=columns) + + return com_accelerations \ No newline at end of file diff --git a/treadmill_gait_analysis/function/utilsProcessing.py b/treadmill_gait_analysis/function/utilsProcessing.py new file mode 100644 index 0000000..b3afe3a --- /dev/null +++ b/treadmill_gait_analysis/function/utilsProcessing.py @@ -0,0 +1,620 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: utilsProcessing.py + --------------------------------------------------------------------------- + + Copyright 2022 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +''' + +import os +pathFile = os.path.dirname(os.path.realpath(__file__)) + +import logging +import opensim +import numpy as np +from scipy import signal +import matplotlib.pyplot as plt +from utils import storage_to_dataframe, download_trial, get_trial_id + +def lowPassFilter(time, data, lowpass_cutoff_frequency, order=4): + + fs = 1/np.round(np.mean(np.diff(time)),16) + wn = lowpass_cutoff_frequency/(fs/2) + sos = signal.butter(order/2, wn, btype='low', output='sos') + dataFilt = signal.sosfiltfilt(sos, data, axis=0) + + return dataFilt + +# %% Segment gait +def segment_gait(session_id, trial_name, data_folder, gait_cycles_from_end=0): + + # Segmentation is done in the gait_analysis class + from gait_analysis import gait_analysis + + gait = gait_analysis(os.path.join(data_folder,session_id), trial_name, + n_gait_cycles=-1) + heelstrikeTimes = gait.gaitEvents['ipsilateralTime'][gait_cycles_from_end,(0,2)].tolist() + + return heelstrikeTimes, gait + +# %% Segment squats. +def segment_squats(ikFilePath, pelvis_ty=None, timeVec=None, visualize=False, + filter_pelvis_ty=True, cutoff_frequency=4, height=.2): + + # TODO: eventually, this belongs in a squat_analysis class and should take + # the form of segment_gait + + # Extract pelvis_ty if not given. + if pelvis_ty is None and timeVec is None: + ikResults = storage_to_dataframe(ikFilePath,headers={'pelvis_ty'}) + timeVec = ikResults['time'] + if filter_pelvis_ty: + from utilsOpenSimAD import filterNumpyArray + pelvis_ty = filterNumpyArray( + ikResults['pelvis_ty'].to_numpy(), timeVec.to_numpy(), + cutoff_frequency=cutoff_frequency) + else: + pelvis_ty = ikResults['pelvis_ty'] + dt = timeVec[1] - timeVec[0] + + # Identify minimums. + pelvSignal = np.array(-pelvis_ty - np.min(-pelvis_ty)) + pelvSignalPos = np.array(pelvis_ty - np.min(pelvis_ty)) + idxMinPelvTy,_ = signal.find_peaks(pelvSignal,distance=.7/dt,height=height) + + # Find the max adjacent to all of the minimums. + minIdxOld = 0 + startFinishInds = [] + for i, minIdx in enumerate(idxMinPelvTy): + if i + np.abs(pelvSignal[val_down-1] - pelvVal_up)): + val_down -= 1 + startFinishIndsDelayPeriodic.append([val[0], val_down]) + risingSittingTimesDelayedStartPeriodicEnd = [ + timeVec[i].tolist() for i in startFinishIndsDelayPeriodic] + + if visualize: + plt.figure() + plt.plot(pelvSignal) + for c_v, val in enumerate(startFinishInds): + plt.plot(val, pelvSignal[val], marker='o', markerfacecolor='k', + markeredgecolor='none', linestyle='none', + label='Rising phase') + val2 = startFinishIndsDelay[c_v][0] + plt.plot(val2, pelvSignal[val2], marker='o', + markerfacecolor='r', markeredgecolor='none', + linestyle='none', label='Delayed start') + val3 = startFinishIndsDelayPeriodic[c_v][1] + plt.plot(val3, pelvSignal[val3], marker='o', + markerfacecolor='g', markeredgecolor='none', + linestyle='none', + label='Periodic end corresponding to delayed start') + if c_v == 0: + plt.legend() + plt.xlabel('Frames') + plt.ylabel('Position [m]') + plt.title('Vertical pelvis position') + plt.tight_layout() + plt.draw() + + return (risingTimes, risingTimesDelayedStart, + risingSittingTimesDelayedStartPeriodicEnd) + +# %% Generate model with adjusted muscle wrapping to prevent unrealistic +# wrapping giving rise to bad muscle-tendon lengths and moment arms. Changes +# are made for the gmax1, iliacus, and psoas. Changes are documented in +# modelAdjustment.log. +def adjust_muscle_wrapping( + baseDir, dataDir, subject, poseDetector='DefaultPD', + cameraSetup='DefaultModel', OpenSimModel="LaiUhlrich2022", + overwrite=False): + + # Paths + osDir = os.path.join(dataDir, subject, 'OpenSimData') + pathModelFolder = os.path.join(osDir, 'Model') + + # We changed the OpenSim model name after some time: + # from LaiArnoldModified2017_poly_withArms_weldHand to LaiUhlrich2022. + # This is a hack for backward compatibility. + if OpenSimModel == 'LaiArnoldModified2017_poly_withArms_weldHand': + unscaledModelName = 'LaiUhlrich2022' + else: + unscaledModelName = OpenSimModel + + pathUnscaledModel = os.path.join(baseDir, 'OpenSimPipeline', 'Models', + unscaledModelName + '.osim') + pathScaledModel = os.path.join(pathModelFolder, + OpenSimModel + '_scaled.osim') + pathOutputModel = os.path.join(pathModelFolder, + OpenSimModel + '_scaled_adjusted.osim') + + if overwrite is False and os.path.exists(pathOutputModel): + return + else: + print('Adjust muscle wrapping surfaces.') + + # Set up logging. + logPath = os.path.join(pathModelFolder,'modelAdjustment.log') + if os.path.exists(logPath): + os.remove(logPath) + # Remove all handlers associated with the root logger object. + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + logging.shutdown() + logging.basicConfig(filename=logPath,format='%(message)s', + level=logging.INFO) + + # Load models. + opensim.Logger.setLevelString('error') + unscaledModel = opensim.Model(pathUnscaledModel) + scaledModel = opensim.Model(pathScaledModel) + scaledBodySet = scaledModel.getBodySet() + + # Poses that often cause problems. + pose_gmax = [ + [['hip_flexion_r',90],['hip_adduction_r',-26], ['hip_rotation_r',40]]] + coord_gmax = 'hip_flexion_r' + + # generic model doesn't wrap beyond 32deg abd. + pose_hipFlexors = [ + [['hip_flexion_r',-30],['hip_adduction_r',-32],['hip_rotation_r',-36]], + [['hip_flexion_r',-30],['hip_adduction_r',-50],['hip_rotation_r',0]], + [['hip_flexion_r',-30],['hip_adduction_r',30],['hip_rotation_r',0]]] + coord_hipFlexors = 'hip_flexion_r' + + # Gmax1 - shrink wrap cyl radius. + momentArmsGmax_unscaled = getMomentArms( + unscaledModel,pose_gmax,'glmax1_r',coord_gmax) + momentArmsGmax_scaled = getMomentArms( + scaledModel,pose_gmax,'glmax1_r',coord_gmax) + # Get wrapping surface. + pelvis = scaledBodySet.get('pelvis') + gmaxWrap = opensim.WrapCylinder.safeDownCast( + pelvis.getWrapObjectSet().get('Gmax1_at_pelvis_r')) + radius = gmaxWrap.get_radius() + originalRadius = np.copy(radius) + + for iPose,(momentArmGmax_scaled,momentArmGmax_unscaled) in enumerate(zip(momentArmsGmax_scaled,momentArmsGmax_unscaled)): + if np.abs(momentArmGmax_scaled) < np.max([0.5* np.abs(momentArmGmax_unscaled), 0.008]): # This constant came from 100 scaled models + originalBadMomentArm = np.copy(momentArmGmax_scaled) + while np.abs(momentArmGmax_scaled) <= np.abs(originalBadMomentArm) and radius > 0.002: + gmaxWrap.set_radius(radius-0.002) + momentArmGmax_scaled = getMomentArms(scaledModel,pose_gmax,'glmax1_r',coord_gmax)[iPose] + radius = gmaxWrap.get_radius() + if radius > 0.5*originalRadius: + outputStr = '-For pose #{}, scaled gmax1 moment arm was {:.3f}. Unscaled was {:.3f}. Reduced R&L wrap radius from {:.3f} to {:.3f}, which increased the moment arm back to {:.3f}.'.format( + iPose, originalBadMomentArm,momentArmGmax_unscaled, + originalRadius,radius,momentArmGmax_scaled) + print(outputStr) + logging.info(outputStr) + # Set the left side as well. + opensim.WrapCylinder.safeDownCast(pelvis.getWrapObjectSet().get('Gmax1_at_pelvis_l')).set_radius(radius) + else: + outputStr = '-For pose #{}, couldn''t restore glmax1 moment arm by shrinking radius by 1/2. Model unchanged.'.format(iPose) + print(outputStr) + logging.info(outputStr) + gmaxWrap.set_radius(float(originalRadius)) + scaledModel.initSystem() + else: + outputStr = '-For pose #{}, scaled gmax1 moment arm was {:.3f}. Unscaled was {:.3f}. No adjustements made.'.format( + iPose,np.abs(momentArmGmax_scaled),np.abs(momentArmGmax_unscaled)) + print(outputStr) + logging.info(outputStr) + + # Iliacus - change path points to engage wrap cylinder. + momentArms_unscaled = getMomentArms( + unscaledModel,pose_hipFlexors,'iliacus_r',coord_hipFlexors) + momentArms_scaled = getMomentArms( + scaledModel,pose_hipFlexors,'iliacus_r',coord_hipFlexors) + # Get path point locations. + muscle = scaledModel.getMuscles().get('iliacus_r') + pathPoints = muscle.get_GeometryPath().getPathPointSet() + point1 = opensim.PathPoint.safeDownCast(pathPoints.get(1)) + loc1Vec = point1.get_location() + point2 = opensim.PathPoint.safeDownCast(pathPoints.get(2)) + loc2Vec = point2.get_location() + original_loc1 = [loc1Vec[i] for i in range(3)] + original_loc2 = [loc2Vec[i] for i in range(3)] + # Get wrap cyl. + wrapCyl = opensim.WrapCylinder.safeDownCast( + pelvis.getWrapObjectSet().get('IL_at_brim_r')) + radius = wrapCyl.get_radius() + originalRadius = np.copy(radius) + previousRadius = np.copy(radius) + + for iPose,(momentArm_scaled,momentArm_unscaled) in enumerate(zip(momentArms_scaled,momentArms_unscaled)): + if np.abs(momentArm_scaled) < np.max([0.7* np.abs(momentArm_unscaled) , 0.015]): + # Get path point locations. + muscle = scaledModel.getMuscles().get('iliacus_r') + pathPoints = muscle.get_GeometryPath().getPathPointSet() + point1 = opensim.PathPoint.safeDownCast(pathPoints.get(1)) + loc1Vec = point1.get_location() + point2 = opensim.PathPoint.safeDownCast(pathPoints.get(2)) + loc2Vec = point2.get_location() + originalBadMomentArm = np.copy(momentArm_scaled) + while np.abs(momentArm_scaled) <= np.max([0.7* np.abs(momentArm_unscaled) , 0.015]) and (np.abs(loc1Vec[0]-original_loc1[0]) < 0.015 and np.abs(loc2Vec[1]-original_loc2[1]) <0.015): + loc1Vec[0] += 0.002 # Move the 1st (pelvis) path point forward + loc2Vec[1] -= 0.002 # move the 2nd (femur) path point down + point1.set_location(loc1Vec) + point2.set_location(loc2Vec) + momentArm_scaled = getMomentArms(scaledModel,pose_hipFlexors,'iliacus_r',coord_hipFlexors)[iPose] + while np.abs(momentArm_scaled) <= np.max([0.7* np.abs(momentArm_unscaled) , 0.015]) and radius>0.7*originalRadius: # above approach did not succeed, drop the cyl radius some + wrapCyl.set_radius(radius-0.002) + momentArm_scaled = getMomentArms(scaledModel,pose_hipFlexors,'iliacus_r',coord_hipFlexors)[iPose] + pelvis = scaledBodySet.get('pelvis') + radius = wrapCyl.get_radius() + if np.abs(momentArm_scaled) > np.max([0.7* np.abs(momentArm_unscaled) , 0.015]): # succeeded + # Set the left side as well. + muscle = scaledModel.getMuscles().get('iliacus_l') + pathPoints = muscle.get_GeometryPath().getPathPointSet() + point1 = opensim.PathPoint.safeDownCast(pathPoints.get(1)) + loc1Vec_l = point1.get_location() + loc1Vec_l[0] = loc1Vec[0] + point1.set_location(loc1Vec_l) + point2 = opensim.PathPoint.safeDownCast(pathPoints.get(2)) + loc2Vec_l = point2.get_location() + loc2Vec_l[1] = loc2Vec[1] + point2.set_location(loc2Vec_l) + if radius0.7*originalRadius: #above approach did not succeed, drop the cyl radius some + wrapCyl.set_radius(radius-0.002) + momentArm_scaled = getMomentArms(scaledModel,pose_hipFlexors,'psoas_r',coord_hipFlexors)[iPose] + pelvis = scaledBodySet.get('pelvis') + radius = wrapCyl.get_radius() + if np.abs(momentArm_scaled) > np.max([0.7* np.abs(momentArm_unscaled) , 0.015]): # succeeded + # set the left side as well. + muscle = scaledModel.getMuscles().get('psoas_l') + pathPoints = muscle.get_GeometryPath().getPathPointSet() + point1 = opensim.PathPoint.safeDownCast(pathPoints.get(1)) + loc1Vec_l = point1.get_location() + loc1Vec_l[0] = loc1Vec[0] + point1.set_location(loc1Vec_l) + point2 = opensim.PathPoint.safeDownCast(pathPoints.get(2)) + loc2Vec_l = point2.get_location() + loc2Vec_l[1] = loc2Vec[1] + point2.set_location(loc2Vec_l) + if radius 3: + self.path = first_line[3] + else: + self.path = '' + + # Third line. + self.data_rate = float(third_line[0]) + self.camera_rate = float(third_line[1]) + self.num_frames = int(third_line[2]) + self.num_markers = int(third_line[3]) + self.units = third_line[4] + self.orig_data_rate = float(third_line[5]) + self.orig_data_start_frame = int(third_line[6]) + self.orig_num_frames = int(third_line[7]) + + # Marker names. + # The first and second column names are 'Frame#' and 'Time'. + self.marker_names = fourth_line[2:] + + len_marker_names = len(self.marker_names) + if len_marker_names != self.num_markers: + warnings.warn('Header entry NumMarkers, %i, does not ' + 'match actual number of markers, %i. Changing ' + 'NumMarkers to match actual number.' % ( + self.num_markers, len_marker_names)) + self.num_markers = len_marker_names + + # Load the actual data. + # --------------------- + col_names = ['frame_num', 'time'] + # This naming convention comes from OpenSim's Inverse Kinematics tool, + # when it writes model marker locations. + for mark in self.marker_names: + col_names += [mark + '_tx', mark + '_ty', mark + '_tz'] + dtype = {'names': col_names, + 'formats': ['int'] + ['float64'] * (3 * self.num_markers + 1)} + usecols = [i for i in range(3 * self.num_markers + 1 + 1)] + self.data = np.loadtxt(fpath, delimiter='\t', skiprows=5, dtype=dtype, + usecols=usecols) + self.time = self.data['time'] + + # Check the number of rows. + n_rows = self.time.shape[0] + if n_rows != self.num_frames: + warnings.warn('%s: Header entry NumFrames, %i, does not ' + 'match actual number of frames, %i, Changing ' + 'NumFrames to match actual number.' % (fpath, + self.num_frames, n_rows)) + self.num_frames = n_rows + + def __getitem__(self, key): + """See `marker()`. + + """ + return self.marker(key) + + def units(self): + return self.units + + def time(self): + this_dat = np.empty((self.num_frames, 1)) + this_dat[:, 0] = self.time + return this_dat + + def marker(self, name): + """The trajectory of marker `name`, given as a `self.num_frames` x 3 + array. The order of the columns is x, y, z. + + """ + this_dat = np.empty((self.num_frames, 3)) + this_dat[:, 0] = self.data[name + '_tx'] + this_dat[:, 1] = self.data[name + '_ty'] + this_dat[:, 2] = self.data[name + '_tz'] + return this_dat + + def add_marker(self, name, x, y, z): + """Add a marker, with name `name` to the TRCFile. + + Parameters + ---------- + name : str + Name of the marker; e.g., 'R.Hip'. + x, y, z: array_like + Coordinates of the marker trajectory. All 3 must have the same + length. + + """ + if (len(x) != self.num_frames or len(y) != self.num_frames or len(z) != + self.num_frames): + raise Exception('Length of data (%i, %i, %i) is not ' + 'NumFrames (%i).', len(x), len(y), len(z), self.num_frames) + self.marker_names += [name] + self.num_markers += 1 + if not hasattr(self, 'data'): + self.data = np.array(x, dtype=[('%s_tx' % name, 'float64')]) + self.data = append_fields(self.data, + ['%s_t%s' % (name, s) for s in 'yz'], + [y, z], usemask=False) + else: + self.data = append_fields(self.data, + ['%s_t%s' % (name, s) for s in 'xyz'], + [x, y, z], usemask=False) + + def marker_at(self, name, time): + x = np.interp(time, self.time, self.data[name + '_tx']) + y = np.interp(time, self.time, self.data[name + '_ty']) + z = np.interp(time, self.time, self.data[name + '_tz']) + return [x, y, z] + + def marker_exists(self, name): + """ + Returns + ------- + exists : bool + Is the marker in the TRCFile? + + """ + return name in self.marker_names + + def write(self, fpath): + """Write this TRCFile object to a TRC file. + + Parameters + ---------- + fpath : str + Valid file path to which this TRCFile is saved. + + """ + f = open(fpath, 'w') + + # Line 1. + f.write('PathFileType 4\t(X/Y/Z) %s\n' % os.path.split(fpath)[0]) + + # Line 2. + f.write('DataRate\tCameraRate\tNumFrames\tNumMarkers\t' + 'Units\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames\n') + + # Line 3. + f.write('%.1f\t%.1f\t%i\t%i\t%s\t%.1f\t%i\t%i\n' % ( + self.data_rate, self.camera_rate, self.num_frames, + self.num_markers, self.units, self.orig_data_rate, + self.orig_data_start_frame, self.orig_num_frames)) + + # Line 4. + f.write('Frame#\tTime\t') + for imark in range(self.num_markers): + f.write('%s\t\t\t' % self.marker_names[imark]) + f.write('\n') + + # Line 5. + f.write('\t\t') + for imark in np.arange(self.num_markers) + 1: + f.write('X%i\tY%s\tZ%s\t' % (imark, imark, imark)) + f.write('\n') + + # Line 6. + f.write('\n') + + # Data. + for iframe in range(self.num_frames): + f.write('%i' % (iframe + 1)) + f.write('\t%.7f' % self.time[iframe]) + for mark in self.marker_names: + idxs = [mark + '_tx', mark + '_ty', mark + '_tz'] + f.write('\t%.7f\t%.7f\t%.7f' % tuple( + self.data[coln][iframe] for coln in idxs)) + f.write('\n') + + f.close() + + def add_noise(self, noise_width): + """ add random noise to each component of the marker trajectory + The noise mean will be zero, with the noise_width being the + standard deviation. + + noise_width : int + """ + for imarker in range(self.num_markers): + components = ['_tx', '_ty', '_tz'] + for iComponent in range(3): + # generate noise + noise = np.random.normal(0, noise_width, self.num_frames) + # add noise to each component of marker data. + self.data[self.marker_names[imarker] + components[iComponent]] += noise + + def rotate(self, axis, value): + """ rotate the data. + + axis : rotation axis + value : angle in degree + """ + for imarker in range(self.num_markers): + + temp = np.zeros((self.num_frames, 3)) + temp[:,0] = self.data[self.marker_names[imarker] + '_tx'] + temp[:,1] = self.data[self.marker_names[imarker] + '_ty'] + temp[:,2] = self.data[self.marker_names[imarker] + '_tz'] + + r = R.from_euler(axis, value, degrees=True) + temp_rot = r.apply(temp) + + self.data[self.marker_names[imarker] + '_tx'] = temp_rot[:,0] + self.data[self.marker_names[imarker] + '_ty'] = temp_rot[:,1] + self.data[self.marker_names[imarker] + '_tz'] = temp_rot[:,2] + + def offset(self, axis, value): + """ offset the data. + + axis : rotation axis + value : offset in m + """ + for imarker in range(self.num_markers): + if axis.lower() == 'x': + self.data[self.marker_names[imarker] + '_tx'] += value + elif axis.lower() == 'y': + self.data[self.marker_names[imarker] + '_ty'] += value + elif axis.lower() == 'z': + self.data[self.marker_names[imarker] + '_tz'] += value + else: + raise ValueError("Axis not recognized") + +def trc_2_dict(pathFile, rotation=None): + # rotation is a dict, eg. {'y':90} with axis, angle for rotation + trc_dict = {} + trc_file = TRCFile(pathFile) + trc_dict['time'] = trc_file.time + trc_dict['marker_names'] = trc_file.marker_names + trc_dict['markers'] = {} + + if rotation != None: + for axis,angle in rotation.items(): + trc_file.rotate(axis,angle) + for count, marker in enumerate(trc_dict['marker_names']): + trc_dict['markers'][marker] = trc_file.marker(marker) + + return trc_dict diff --git a/treadmill_gait_analysis/requirements.txt b/treadmill_gait_analysis/requirements.txt new file mode 100644 index 0000000..94941c0 --- /dev/null +++ b/treadmill_gait_analysis/requirements.txt @@ -0,0 +1,8 @@ +scipy +pandas +matplotlib +ipython +python-decouple +maskpass==0.3.6 +requests +pyyaml \ No newline at end of file From cc91ff8c3edc4ff2333a331db96f227050c4fa4c Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Thu, 15 Feb 2024 11:31:43 -0800 Subject: [PATCH 4/6] copying github workflows --- .../workflows/treadmill_gait_analysis-dev.yml | 74 +++++++++++++++++++ .github/workflows/treadmill_gait_analysis.yml | 74 +++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 .github/workflows/treadmill_gait_analysis-dev.yml create mode 100644 .github/workflows/treadmill_gait_analysis.yml diff --git a/.github/workflows/treadmill_gait_analysis-dev.yml b/.github/workflows/treadmill_gait_analysis-dev.yml new file mode 100644 index 0000000..0624421 --- /dev/null +++ b/.github/workflows/treadmill_gait_analysis-dev.yml @@ -0,0 +1,74 @@ +# This workflow will build and push a new container image to Amazon ECR, +# and then will deploy a new task definition to Amazon ECS, on every push +# to the master branch. +# +# To use this workflow, you will need to complete the following set-up steps: +# +# 1. Create an ECR repository to store your images. +# For example: `aws ecr create-repository --repository-name my-ecr-repo --region us-east-2`. +# Replace the value of `ECR_REPOSITORY` in the workflow below with your repository's name. +# Replace the value of `aws-region` in the workflow below with your repository's region. +# +# 2. Create an ECS task definition, an ECS cluster, and an ECS service. +# For example, follow the Getting Started guide on the ECS console: +# https://us-east-2.console.aws.amazon.com/ecs/home?region=us-east-2#/firstRun +# Replace the values for `service` and `cluster` in the workflow below with your service and cluster names. +# +# 3. Store your ECS task definition as a JSON file in your repository. +# The format should follow the output of `aws ecs register-task-definition --generate-cli-skeleton`. +# Replace the value of `task-definition` in the workflow below with your JSON file's name. +# Replace the value of `container-name` in the workflow below with the name of the container +# in the `containerDefinitions` section of the task definition. +# +# 4. Store an IAM user access key in GitHub Actions secrets named `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. +# See the documentation for each action used below for the recommended IAM policies for this IAM user, +# and best practices on handling the access key credentials. + +on: + push: + branches: + - dev + paths: + - 'gait_analysis/**' + workflow_dispatch: + +name: DEV Analysis "gait analysis" build & deployment + +jobs: + deploy: + name: Deploy + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v1 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, tag, and push image to Amazon ECR + id: build-image + env: + IMAGE_TAG: latest # ${{ github.sha }} + run: | + # Build a docker container and + # push it to ECR so that it can + # be deployed to ECS. + cd gait_analysis + docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG . + docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG + echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG" + + - name: Force deployment + env: + IMAGE_TAG: latest # ${{ github.sha }} + run: | + aws lambda update-function-code --function-name gait-analysis-dev --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' diff --git a/.github/workflows/treadmill_gait_analysis.yml b/.github/workflows/treadmill_gait_analysis.yml new file mode 100644 index 0000000..5f19586 --- /dev/null +++ b/.github/workflows/treadmill_gait_analysis.yml @@ -0,0 +1,74 @@ +# This workflow will build and push a new container image to Amazon ECR, +# and then will deploy a new task definition to Amazon ECS, on every push +# to the master branch. +# +# To use this workflow, you will need to complete the following set-up steps: +# +# 1. Create an ECR repository to store your images. +# For example: `aws ecr create-repository --repository-name my-ecr-repo --region us-east-2`. +# Replace the value of `ECR_REPOSITORY` in the workflow below with your repository's name. +# Replace the value of `aws-region` in the workflow below with your repository's region. +# +# 2. Create an ECS task definition, an ECS cluster, and an ECS service. +# For example, follow the Getting Started guide on the ECS console: +# https://us-east-2.console.aws.amazon.com/ecs/home?region=us-east-2#/firstRun +# Replace the values for `service` and `cluster` in the workflow below with your service and cluster names. +# +# 3. Store your ECS task definition as a JSON file in your repository. +# The format should follow the output of `aws ecs register-task-definition --generate-cli-skeleton`. +# Replace the value of `task-definition` in the workflow below with your JSON file's name. +# Replace the value of `container-name` in the workflow below with the name of the container +# in the `containerDefinitions` section of the task definition. +# +# 4. Store an IAM user access key in GitHub Actions secrets named `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. +# See the documentation for each action used below for the recommended IAM policies for this IAM user, +# and best practices on handling the access key credentials. + +on: + push: + branches: + - main + paths: + - 'gait_analysis/**' + workflow_dispatch: + +name: PROD Analysis "gait analysis" build & deployment + +jobs: + deploy: + name: Deploy + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v1 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, tag, and push image to Amazon ECR + id: build-image + env: + IMAGE_TAG: latest # ${{ github.sha }} + run: | + # Build a docker container and + # push it to ECR so that it can + # be deployed to ECS. + cd gait_analysis + docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG . + docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG + echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG" + + - name: Force deployment + env: + IMAGE_TAG: latest # ${{ github.sha }} + run: | + aws lambda update-function-code --function-name gait-analysis --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' From 311b7facc2497d1e57d5c033795d83a55f9c795f Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Thu, 15 Feb 2024 11:43:45 -0800 Subject: [PATCH 5/6] adjusting github workflows --- .github/workflows/treadmill_gait_analysis-dev.yml | 14 +++++++------- .github/workflows/treadmill_gait_analysis.yml | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/workflows/treadmill_gait_analysis-dev.yml b/.github/workflows/treadmill_gait_analysis-dev.yml index 0624421..ff02769 100644 --- a/.github/workflows/treadmill_gait_analysis-dev.yml +++ b/.github/workflows/treadmill_gait_analysis-dev.yml @@ -29,10 +29,10 @@ on: branches: - dev paths: - - 'gait_analysis/**' + - 'treadmill_gait_analysis/**' workflow_dispatch: -name: DEV Analysis "gait analysis" build & deployment +name: DEV Analysis "treadmill gait analysis" build & deployment jobs: deploy: @@ -62,13 +62,13 @@ jobs: # Build a docker container and # push it to ECR so that it can # be deployed to ECS. - cd gait_analysis - docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG . - docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG - echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG" + cd treadmill_gait_analysis + docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis-dev:$IMAGE_TAG . + docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis-dev:$IMAGE_TAG + echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis-dev:$IMAGE_TAG" - name: Force deployment env: IMAGE_TAG: latest # ${{ github.sha }} run: | - aws lambda update-function-code --function-name gait-analysis-dev --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis-dev:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' + aws lambda update-function-code --function-name treadmill-gait-analysis-dev --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis-dev:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' diff --git a/.github/workflows/treadmill_gait_analysis.yml b/.github/workflows/treadmill_gait_analysis.yml index 5f19586..3d503e2 100644 --- a/.github/workflows/treadmill_gait_analysis.yml +++ b/.github/workflows/treadmill_gait_analysis.yml @@ -29,10 +29,10 @@ on: branches: - main paths: - - 'gait_analysis/**' + - 'treadmill_gait_analysis/**' workflow_dispatch: -name: PROD Analysis "gait analysis" build & deployment +name: PROD Analysis "treadmill gait analysis" build & deployment jobs: deploy: @@ -62,13 +62,13 @@ jobs: # Build a docker container and # push it to ECR so that it can # be deployed to ECS. - cd gait_analysis - docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG . - docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG - echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG" + cd treadmill_gait_analysis + docker build -f Dockerfile -t 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis:$IMAGE_TAG . + docker push 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis:$IMAGE_TAG + echo "::set-output name=image::660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis:$IMAGE_TAG" - name: Force deployment env: IMAGE_TAG: latest # ${{ github.sha }} run: | - aws lambda update-function-code --function-name gait-analysis --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/gait_analysis:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' + aws lambda update-function-code --function-name treadmill-gait-analysis --image-uri 660440363484.dkr.ecr.us-west-2.amazonaws.com/opencap-analysis/treadmill_gait_analysis:$IMAGE_TAG | jq 'if .Environment.Variables.API_TOKEN? then .Environment.Variables.API_TOKEN = "REDACTED" else . end' From 1e821e958e6f9ec0073f358287ac830efea07c25 Mon Sep 17 00:00:00 2001 From: Antoine Falisse Date: Thu, 15 Feb 2024 12:08:26 -0800 Subject: [PATCH 6/6] adjust handler for treadmill analysis --- treadmill_gait_analysis/function/handler.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/treadmill_gait_analysis/function/handler.py b/treadmill_gait_analysis/function/handler.py index 026555c..83521eb 100644 --- a/treadmill_gait_analysis/function/handler.py +++ b/treadmill_gait_analysis/function/handler.py @@ -60,7 +60,7 @@ def handler(event, context): # Select how many gait cycles you'd like to analyze. Select -1 for all gait # cycles detected in the trial. - n_gait_cycles = 1 + n_gait_cycles = -1 # Select lowpass filter frequency for kinematics data. filter_frequency = 6 @@ -88,8 +88,7 @@ def handler(event, context): gait[leg] = gait_analysis( sessionDir, trial_name, leg=leg, lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, - n_gait_cycles=n_gait_cycles, gait_style='overground', - trimming_start=0, trimming_end=0.5) + n_gait_cycles=n_gait_cycles, gait_style='treadmill') gait_events[leg] = gait[leg].get_gait_events() # Select last leg. @@ -140,10 +139,10 @@ def handler(event, context): # %% Create json for deployement. # Indices / Times indices = {} - indices['start'] = int(gait_events[last_leg]['ipsilateralIdx'][0,0]) + indices['start'] = int(gait_events[last_leg]['ipsilateralIdx'][-1,0]) indices['end'] = int(gait_events[last_leg]['ipsilateralIdx'][0,-1]) times = {} - times['start'] = float(gait_events[last_leg]['ipsilateralTime'][0,0]) + times['start'] = float(gait_events[last_leg]['ipsilateralTime'][-1,0]) times['end'] = float(gait_events[last_leg]['ipsilateralTime'][0,-1]) # Metrics