Skip to content

Commit

Permalink
select right leg by default and remove mtp angles
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinefalisse committed Feb 13, 2024
1 parent cef8a20 commit 778640e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 36 deletions.
181 changes: 150 additions & 31 deletions gait_analysis/function/gait_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,52 @@ class gait_analysis(kinematics):

def __init__(self, session_dir, trial_name, leg='auto',
lowpass_cutoff_frequency_for_coordinate_values=-1,
n_gait_cycles=-1):
n_gait_cycles=-1, gait_style='auto', trimming_start=0,
trimming_end=0):

# Inherit init from kinematics class.
super().__init__(
session_dir,
trial_name,
lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values)

# We might want to trim the start/end of the trial to remove bad data.
# For example, this might be needed with HRNet during overground
# walking, where, at the end, the subject is leaving the field of view
# but HRNet returns relatively high confidence values. As a result,
# the trial is not well trimmed. Here, we provide the option to
# manually trim the start and end of the trial.
self.trimming_start = trimming_start
self.trimming_end = trimming_end

# Marker data load and filter.
self.markerDict= self.get_marker_dict(session_dir, trial_name,
lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values)
self.markerDict = self.get_marker_dict(session_dir, trial_name,
lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values)

# Coordinate values.
self.coordinateValues = self.get_coordinate_values()

# Trim marker data and coordinate values.
if self.trimming_start > 0:
self.idx_trim_start = np.where(np.round(self.markerDict['time'] - self.trimming_start,6) <= 0)[0][-1]
self.markerDict['time'] = self.markerDict['time'][self.idx_trim_start:,]
for marker in self.markerDict['markers']:
self.markerDict['markers'][marker] = self.markerDict['markers'][marker][self.idx_trim_start:,:]
self.coordinateValues = self.coordinateValues.iloc[self.idx_trim_start:]

if self.trimming_end > 0:
self.idx_trim_end = np.where(np.round(self.markerDict['time'],6) <= np.round(self.markerDict['time'][-1] - self.trimming_end,6))[0][-1] + 1
self.markerDict['time'] = self.markerDict['time'][:self.idx_trim_end,]
for marker in self.markerDict['markers']:
self.markerDict['markers'][marker] = self.markerDict['markers'][marker][:self.idx_trim_end,:]
self.coordinateValues = self.coordinateValues.iloc[:self.idx_trim_end]

# Segment gait cycles.
self.gaitEvents = self.segment_walking(n_gait_cycles=n_gait_cycles,leg=leg)
self.nGaitCycles = np.shape(self.gaitEvents['ipsilateralIdx'])[0]

# Determine treadmill speed (0 if overground).
self.treadmillSpeed,_ = self.compute_treadmill_speed()
self.treadmillSpeed,_ = self.compute_treadmill_speed(gait_style=gait_style)

# Initialize variables to be lazy loaded.
self._comValues = None
Expand All @@ -63,6 +88,10 @@ def __init__(self, session_dir, trial_name, leg='auto',
def comValues(self):
if self._comValues is None:
self._comValues = self.get_center_of_mass_values()
if self.trimming_start > 0:
self._comValues = self._comValues.iloc[self.idx_trim_start:]
if self.trimming_end > 0:
self._comValues = self._comValues.iloc[:self.idx_trim_end]
return self._comValues

# Compute gait frame.
Expand Down Expand Up @@ -205,29 +234,37 @@ def compute_cadence(self,return_all=False):
else:
return cadence, units

def compute_treadmill_speed(self, overground_speed_threshold=0.3,return_all=False):

leg,_ = self.get_leg()
def compute_treadmill_speed(self, overground_speed_threshold=0.3,
gait_style='auto', return_all=False):

foot_position = self.markerDict['markers'][leg + '_ankle_study']

stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2]))
startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int)
endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int)
# Heuristic to determine if overground or treadmill.
if gait_style == 'auto' or gait_style == 'treadmill':
leg,_ = self.get_leg()

# Average instantaneous velocities.
dt = np.diff(self.markerDict['time'][:2])[0]
for i in range(self.nGaitCycles):
footVel = np.linalg.norm(np.mean(np.diff(
foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt)

treadmillSpeeds = footVel
treadmillSpeed = np.mean(treadmillSpeeds)
foot_position = self.markerDict['markers'][leg + '_ankle_study']

stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2]))
startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int)
endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int)

# Average instantaneous velocities.
dt = np.diff(self.markerDict['time'][:2])[0]
treadmillSpeeds = np.zeros((self.nGaitCycles,))
for i in range(self.nGaitCycles):
treadmillSpeeds[i,] = np.linalg.norm(np.mean(np.diff(
foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt)

treadmillSpeed = np.mean(treadmillSpeeds)

# Overground if treadmill speed is below threshold and gait style not set to treadmill.
if treadmillSpeed < overground_speed_threshold and not gait_style == 'treadmill':
treadmillSpeed = 0
treadmillSpeeds = np.zeros(self.nGaitCycles)

# Overground.
if treadmillSpeed < overground_speed_threshold:
# Overground if gait style set to overground.
elif gait_style == 'overground':
treadmillSpeed = 0
treadmillSpeeds = np.zeros(treadmillSpeeds.shape)
treadmillSpeeds = np.zeros(self.nGaitCycles)

# Define units.
units = 'm/s'
Expand Down Expand Up @@ -622,7 +659,7 @@ def get_coordinates_normalized_time(self):
data = self.coordinateValues.to_numpy(copy=True)
coordValuesNorm = []
for i in range(self.nGaitCycles):
coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]]
coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]+1]
coordValuesNorm.append(np.stack([np.interp(np.linspace(0,100,101),
np.linspace(0,100,len(coordValues)),coordValues[:,i]) \
for i in range(coordValues.shape[1])],axis=1))
Expand All @@ -648,6 +685,65 @@ def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False):

# n_gait_cycles = -1 finds all accessible gait cycles. Otherwise, it
# finds that many gait cycles, working backwards from end of trial.

# Helper functions
def detect_gait_peaks(r_calc_rel_x,
l_calc_rel_x,
r_toe_rel_x,
l_toe_rel_x,
prominence = 0.3):
# Find HS.
rHS, _ = find_peaks(r_calc_rel_x, prominence=prominence)
lHS, _ = find_peaks(l_calc_rel_x, prominence=prominence)

# Find TO.
rTO, _ = find_peaks(-r_toe_rel_x, prominence=prominence)
lTO, _ = find_peaks(-l_toe_rel_x, prominence=prominence)

return rHS,lHS,rTO,lTO

def detect_correct_order(rHS, rTO, lHS, lTO):
# checks if the peaks are in the right order

expectedOrder = {'rHS': 'lTO',
'lTO': 'lHS',
'lHS': 'rTO',
'rTO': 'rHS'}

# Identify vector that has the smallest value in it. Put this vector name
# in vName1
vectors = {'rHS': rHS, 'rTO': rTO, 'lHS': lHS, 'lTO': lTO}
non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0}

# Check if there are any non-empty vectors
if not non_empty_vectors:
return True # All vectors are empty, consider it correct order

vName1 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0])

# While there are any values in any of the vectors (rHS, rTO, lHS, or lTO)
while any([len(vName) > 0 for vName in vectors.values()]):
# Delete the smallest value from the vName1
vectors[vName1] = np.delete(vectors[vName1], 0)

# Then find the vector with the next smallest value. Define vName2 as the
# name of this vector
non_empty_vectors = {k: v for k, v in vectors.items() if len(v) > 0}

# Check if there are any non-empty vectors
if not non_empty_vectors:
break # All vectors are empty, consider it correct order

vName2 = min(non_empty_vectors, key=lambda k: non_empty_vectors[k][0])

# If vName2 != expectedOrder[vName1], return False
if vName2 != expectedOrder[vName1]:
return False

# Set vName1 equal to vName2 and clear vName2
vName1, vName2 = vName2, ''

return True

# Subtract sacrum from foot.
# It looks like the position-based approach will be more robust.
Expand All @@ -666,13 +762,36 @@ def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False):
self.markerDict['markers']['L_toe_study'] -
self.markerDict['markers']['L.PSIS_study'])[:,0]

# Find HS.
rHS, _ = find_peaks(r_calc_rel_x, prominence=0.3)
lHS, _ = find_peaks(l_calc_rel_x, prominence=0.3)

# Find TO.
rTO, _ = find_peaks(-r_toe_rel_x, prominence=0.3)
lTO, _ = find_peaks(-l_toe_rel_x, prominence=0.3)
# Identify which direction the subject is walking.
r_psis_x = self.markerDict['markers']['r.PSIS_study'][:,0]
r_asis_x = self.markerDict['markers']['r.ASIS_study'][:,0]
r_dir_x = r_asis_x-r_psis_x
position_approach_scaling = np.where(r_dir_x > 0, 1, -1)
# Adjust relative positions accordingly.
r_calc_rel_x *= position_approach_scaling
r_toe_rel_x *= position_approach_scaling
l_calc_rel_x *= position_approach_scaling
l_toe_rel_x *= position_approach_scaling

# Detect peaks, check if they're in the right order, if not reduce prominence.
# the peaks can be less prominent with pathological or slower gait patterns
prominences = [0.3, 0.25, 0.2]

for i,prom in enumerate(prominences):
rHS,lHS,rTO,lTO = detect_gait_peaks(r_calc_rel_x=r_calc_rel_x,
l_calc_rel_x=l_calc_rel_x,
r_toe_rel_x=r_toe_rel_x,
l_toe_rel_x=l_toe_rel_x,
prominence=prom)
if not detect_correct_order(rHS=rHS, rTO=rTO, lHS=lHS, lTO=lTO):
if prom == prominences[-1]:
raise ValueError('The ordering of gait events is not correct. Consider trimming your trial using the trimming_start and trimming_end options.')
else:
print('The gait events were not in the correct order. Trying peak detection again ' +
'with prominence = ' + str(prominences[i+1]) + '.')
else:
# everything was in the correct order. continue.
break

if visualize:
import matplotlib.pyplot as plt
Expand Down
11 changes: 6 additions & 5 deletions gait_analysis/function/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,17 @@ def handler(event, context):

# %% Process data.
# Init gait analysis and get gait events.
legs = ['r','l']
gait, gait_events, ipsilateral = {}, {}, {}
legs = ['r']
gait, gait_events = {}, {}
for leg in legs:
gait[leg] = gait_analysis(
sessionDir, trial_name, leg=leg,
lowpass_cutoff_frequency_for_coordinate_values=filter_frequency,
n_gait_cycles=n_gait_cycles)
gait_events[leg] = gait[leg].get_gait_events()
ipsilateral[leg] = gait_events[leg]['ipsilateralTime'][0,-1]

# Select last leg.
last_leg = 'r' if ipsilateral['r'] > ipsilateral['l'] else 'l'
last_leg = 'r'

# Compute scalars.
gait_scalars = gait[last_leg].compute_scalars(scalar_names)
Expand Down Expand Up @@ -180,7 +179,7 @@ def handler(event, context):
datasets.append({})
for j in range(coordValues.shape[1]):
# Exclude knee_angle_r_beta and knee_angle_l_beta
if 'beta' in colNames[j]:
if 'beta' in colNames[j] or 'mtp' in colNames[j]:
continue
datasets[i][colNames[j]] = coordValues[i,j]

Expand All @@ -189,6 +188,8 @@ def handler(event, context):
y_axes.remove('time')
y_axes.remove('knee_angle_r_beta')
y_axes.remove('knee_angle_l_beta')
y_axes.remove('mtp_angle_r')
y_axes.remove('mtp_angle_l')

# Create results dictionnary.
results = {
Expand Down
9 changes: 9 additions & 0 deletions gait_analysis/function/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ def get_user_sessions():

return sessions

# Returns a list of all sessions of the user.
# TODO: this also contains public sessions of other users.
def get_user_sessions_all(user_token=API_TOKEN):
sessions = requests.get(
API_URL + "sessions/",
headers = {"Authorization": "Token {}".format(user_token)}).json()

return sessions

def get_trial_json(trial_id):
trialJson = requests.get(
API_URL + "trials/{}/".format(trial_id),
Expand Down

0 comments on commit 778640e

Please sign in to comment.