Center of Mass estimator: explanation of the code
If the center_of_mass package is not installed, run the following lines:
import sys
sys.path.insert(0, '../src')
[1]:
import center_of_mass
[2]:
import numpy, pickle, sys
Measurement errors used for calculating the optimal estimator gains
[3]:
Position_std = 0.0035 # standard deviation of the error in CoM position obtained from the kinematics alone (in m)
Force_std = 2 # standard deviation of the error in Ground reaction force (in N)
The example data file is loaded
[4]:
input_file = '..\examples\example_data.pkl'
pickle_file = open(input_file,'rb')
data = pickle.load(pickle_file)
pickle_file.close()
sex = data['sex'] # 'female' or 'male'
Labels = data['Labels'] # list of marker labels
Kinematic_frequency = data['Position_frequency'] # in Hertz
Position = data['Position'] # dictionary with, for each marker, the position in meters (numpy.ndarray of shape (3,duration*Position_frequency)
Force_frequency = data['Force_frequency'] # in Hertz
GroundReactionForce = data['GroundReactionForce']# in Newtons (numpy.ndarray of shape (3,duration*Force_frequency)
The center of mass is calculated (in this example we use the method of Dumas et al. 2007).
[5]:
kinematics = center_of_mass.Kinematics(Position, Labels, sex)
Kinematic_com = kinematics.calculate_CoM()
The person’s mass is determined as the median vertical ground reaction force during the initial 1.5 seconds of quiet standing
[6]:
mass = numpy.median(GroundReactionForce[2,:int(1.5*Force_frequency)])/9.81
The Center of Mass acceleration is calculated by subtracting the weight from the ground reaction force and dividing by mass. By default, the vertical axis is assumed to correspond to the third row of the Ground Reaction Force, and to be oriented upwards. However, this can be changed through the arguments ‘vertical’ and ‘direction’.
[7]:
def com_acceleration(GroundReactionForce, mass, gravity_direction = numpy.array([0,0,-1])):
'''Calculates the CoM acceleration from the Ground reaction force and mass
Parameters
----------
GroundReactionForce: (NbOfDimensions,NbOfSamples) numpy.ndarray
Ground reaction force (in Newton)
mass: float
subject's mass (in kg)
gravity_direction: (NbOfDimensions) numpy.ndarray, optional
direction of the gravity vector used to subtract the subject's weight, default is numpy.array([0,0,-1])
Returns
-------
Acceleration: (NbOfDimensions,NbOfSamples) numpy.ndarray
Acceleration of the Center of Mass (in m/s^2)
'''
## The net force is the sum of the ground reaction force and the person's weight
NetForce = GroundReactionForce + mass*9.81*numpy.array([gravity_direction]).T
## Acceleration is force divided by mass
Acceleration = NetForce/mass
return Acceleration
[8]:
Acceleration = com_acceleration(GroundReactionForce, mass)
The position and acceleration signals must be synchronised before applying the filtering. If they have a different sampling frequency, they are sub-sampled at a common frequency. The user can specify a desired sub-sampling frequency (which must be a common divisor of both Position_frequency and Acceleration_frequency). Otherwise, the greatest common divisor is used as the sub-sampling frequency.
[9]:
def subsample_one_signal(signal, signal_frequency, sub_frequency):
'''Subsample a signal at a given Frequency
Parameters
----------
signal: (NbOfDimensions, NbOfSamples) numpy.ndarray
Signal to subsample
signal_frequency: int
Sampling frequency (in Hertz) of the signal
sub_frequency: int
Desired sub-sampling frequency (in Hertz)
Returns
-------
signal_subsampled: (NbOfDimensions, NbOfSamples_sub) numpy.ndarray
Subsampled signal
'''
# The sub-sampling frequency must be a divisor of the signal frequency:
if signal_frequency % sub_frequency != 0:
raise ValueError('The sub-sampling frequency is not a divisor of the signal frequency. This would lead to synchronisation issues when sub-sampling.')
else:
if signal_frequency == sub_frequency:
return signal
else:
bin_size = int(signal_frequency/sub_frequency) # the signal will be averaged over bins of this size
NbOfDimensions = numpy.shape(signal)[0]
NbOfSamples_sub = int(numpy.shape(signal)[1]/bin_size)
signal_truncated = signal[:,:bin_size*NbOfSamples_sub]
signal_subsampled = numpy.zeros((NbOfDimensions,NbOfSamples_sub))
for dim in range(NbOfDimensions):
signal_reshape = signal_truncated[dim].reshape(NbOfSamples_sub,bin_size)
signal_subsampled[dim] = numpy.mean(signal_reshape, axis = 1)
return signal_subsampled
def subsample_two_signals(signal1, frequency1, signal2, frequency2, sub_frequency = None):
'''Subsample two signals at a common frequency
Parameters
----------
signal1: (NbOfDimensions,NbOfSamples1) numpy.ndarray
First signal
frequency1: int
Sampling frequency (in Hertz) of the first signal
signal2: (NbOfDimensions,NbOfSamples2) numpy.ndarray
Second signal
frequency2: int
Sampling frequency (in Hertz) of the second signal
sub_frequency: int, optional
Desired sub-sampling frequency (in Hertz), default is None
Returns
-------
signal1_subsampled: (NbOfDimensions,NbOfSamples_sub) numpy.ndarray
Subsampled first signal
signal2_subsampled: (NbOfDimensions,NbOfSamples_sub) numpy.ndarray
Subsampled second signal
sub_frequency: int
Subsampling frequency (in Hertz)
If the sub-sampling frequency was not specified by the user, this is the greatest common divisor of frequency1 and frequency2
'''
if not sub_frequency:
## The frequency at which both signals will be subsampled is the greatest common divisor of the two frequencies
import math
sub_frequency = math.gcd(int(frequency1), int(frequency2))
signal1_subsampled = subsample_one_signal(signal1, frequency1, sub_frequency)
signal2_subsampled = subsample_one_signal(signal2, frequency2, sub_frequency)
# Truncate the signals so that they have the same length
NbOfSamples_sub = min(numpy.shape(signal1_subsampled)[1],numpy.shape(signal2_subsampled)[1])
signal1_subsampled = signal1_subsampled[:,:NbOfSamples_sub]
signal2_subsampled = signal2_subsampled[:,:NbOfSamples_sub]
return signal1_subsampled, signal2_subsampled, sub_frequency
[10]:
Acc_subsampled, Pos_subsampled, Frequency = subsample_two_signals(Acceleration, Force_frequency, Kinematic_com, Kinematic_frequency)
The optimal estimator gains depend on the dimensionless ratio of position to acceleration noise:



[11]:
def estimator_gains(Force_std, Position_std, Frequency, mass):
'''Calculates the optimal estimator gains according to the measurement errors and sampling frequency
Parameters
----------
Force_std: float or (NbOfDimensions,) numpy.ndarray
Standard deviation of the error in Ground reaction force (in N) (can be provided for each dimension independently)
Position_std: float or (NbOfDimensions,) numpy.ndarray
Standard deviation of the error in CoM position obtained from the kinematics (in m) (can be provided for each dimension independently)
Frequency: int
Sampling frequency of the (sub-sampled) CoM position and acceleration
mass: float
Mass of the subject (in kg)
Returns
-------
l1: float or (NbOfDimensions,) numpy.ndarray
Optimal estimator gain for position (dimensionless)
l2: float or (NbOfDimensions,) numpy.ndarray
Optimal estimator gain for velocity (dimensionless)
'''
Acceleration_std = Force_std/mass
ratio = Position_std/Acceleration_std*Frequency**2
l2 = numpy.array((4*ratio+1 - numpy.sqrt(1+8*ratio))/(4*ratio**2))
l1 = numpy.array(1 - ratio**2*l2**2)
if not numpy.array(((Position_std > 0)*(Force_std > 0))).all():
if numpy.array((Force_std == 0)*(Position_std == 0)).any():
raise ValueError('Either Force_std or Position_std must be strictly positive')
elif numpy.array(Force_std == 0).any():
l1[Force_std == 0] = 0
l2[Force_std == 0] = 0
elif numpy.array(Position_std == 0).any():
l1[Position_std == 0] = 1
l2[Position_std == 0] = 2
else:
raise ValueError('Force_std and Position_std must be positive')
return l1, l2
[12]:
l1, l2 = estimator_gains(Force_std, Position_std, Frequency, mass)
Unless they are specified by the user, the initial estimates of position and velocity are obtained as a least-squares fit on the first few samples of the position measurement.

[13]:
def initial_conditions(Pos_measurement, T, initial_samples = 10):
'''The initial estimates of position and velocity are obtained as a least-squares fit on the first few samples of the position measurement.
Parameters
----------
Pos_measurement: (NbOfSamples,) numpy.ndarray
Position measurement (in m)
T: float
duration (in seconds) between successive samples (i.e. 1/Sampling_frequency)
initial_samples: int, optional
Number of samples used to estimate initial position and velocity (default is 10)
Returns
-------
pos_initial: float
Initial position estimate (in m)
vel_initial: float
Initial velocity estimate (in m/s)
'''
Coefficients = numpy.array([numpy.ones(initial_samples),numpy.arange(initial_samples)*T]).T
[pos_initial, vel_initial] = numpy.linalg.lstsq(Coefficients, Pos_measurement[:initial_samples], rcond=None)[0]
return pos_initial, vel_initial
Given the estimate
of the state position and velocity at timepoint
and the acceleration mesurement
at timepoint
, we obtain a prediction
of the system state at timepoint
by double integration of the measured acceleration:

The position prediction
is then compared to the position measurement
and the difference (i.e. prediction error) is used to adjust the estimates of position and speed at timepoint
using the estimator gains
:


[14]:
def estimator(Acc_measurement, Pos_measurement, l1, l2, T, Initial_conditions):
'''Estimation of the position and velocity, given (noisy) measurements of position and acceleration,
estimator gains, and initial conditions
Parameters
----------
Acc_measurement: (NbOfSamples,) numpy.ndarray
Acceleration measurement (in m/s^2)
Pos_measurement: (NbOfSamples,) numpy.ndarray
Position measurement (in m)
l1: float or (NbOfDimensions,) numpy.ndarray
Position estimator gain (dimensionless)
l2: float or (NbOfDimensions,) numpy.ndarray
Velocity estimator gain (dimensionless)
T: float
duration (in seconds) between successive samples (i.e. 1/Sampling_frequency)
Initial_conditions: (2,) numpy.ndarray
initial estimates of position (in m) and velocity (in m/s)
Returns
-------
Pos_estimate: (NbOfSamples,) numpy.ndarray
Position estimate (in m)
Vel_estimate: (NbOfSamples,) numpy.ndarray
Velocity estimate (in m/s)
'''
Pos_estimate = numpy.zeros(len(Pos_measurement))
Vel_estimate = numpy.zeros(len(Pos_measurement))
Pos_estimate[0], Vel_estimate[0] = Initial_conditions
for k in range(len(Pos_measurement)-1):
pos_prediction = Pos_estimate[k]+T*Vel_estimate[k]+T**2/2*Acc_measurement[k]
vel_prediction = Vel_estimate[k]+T*Acc_measurement[k]
Pos_estimate[k+1] = (1-l1)*pos_prediction +l1*Pos_measurement[k+1]
Vel_estimate[k+1] = vel_prediction +l2/T*(Pos_measurement[k+1]-pos_prediction)
return Pos_estimate, Vel_estimate
The estimator is applied, for each dimension separately, both forwards and backwards in time, and the forwards and backwards averaged are then merged. Note that when reversing time, velocity changes sign, whereas position and acceleration are unchanged.
[15]:
def estimator_backandforth(Acc_measurement, Pos_measurement, l1, l2, Frequency, Initial_conditions = None, Final_conditions = None, initial_samples = 10):
'''The estimator is applied, for each dimension separately, both forwards and backwards in time, and the forwards and backwards estimates are then merged.
Parameters
----------
Acc_measurement: (NbOfDimensions, NbOfSamples) numpy.ndarray
Acceleration measurement (in m/s^2)
Pos_measurement: (NbOfDimensions, NbOfSamples) numpy.ndarray
Position measurement (in m)
l1: float or (NbOfDimensions,) numpy.ndarray
Position estimator gain (dimensionless)
l2: float or (NbOfDimensions,) numpy.ndarray
Velocity estimator gain (dimensionless)
Frequency: float
Sampling frequency (in Hertz)
Initial_conditions: (NbOfDimensions,2) numpy.ndarray, optional
Initial estimates of position (in m) and velocity (in m/s), used when the estimator is applied forwards in time (default is None).
If None, the initial conditions are determined by a least-squares fit on the first few samples.
Final_conditions: (NbOfDimensions,2) numpy.ndarray, optional
Final estimates of position (in m) and velocity (in m/s), used when the estimator is applied backwards in time (default is None).
If None, the final conditions are determined by a least-squares fit on the first few samples.
initial_samples: int, optional
Number of samples used to estimate initial and final position and velocity (default is 10)
Returns
-------
Pos_estimate: (NbOfDimensions, NbOfSamples) numpy.ndarray
Position estimate (in m)
Vel_estimate: (NbOfDimensions, NbOfSamples) numpy.ndarray
Velocity estimate (in m/s)
'''
NbOfDimensions, NbOfSamples = numpy.shape(Pos_measurement)
Pos_estimate_forw = numpy.zeros((NbOfDimensions, NbOfSamples))
Vel_estimate_forw = numpy.zeros((NbOfDimensions, NbOfSamples))
Pos_estimate_back = numpy.zeros((NbOfDimensions, NbOfSamples))
Vel_estimate_back = numpy.zeros((NbOfDimensions, NbOfSamples))
# If a single value is given for l1 and l2, these are used for all dimensions
if len(numpy.shape(l1)) == 0:
l1 = l1*numpy.ones((NbOfDimensions))
if len(numpy.shape(l2)) == 0:
l2 = l2*numpy.ones((NbOfDimensions))
for dim in range(NbOfDimensions):
# If the user does not specify the initial estimate of position and velocity, these are estimated from the first few samples of position
if numpy.sum(Initial_conditions==None) > 0:
Initial_conditions_dim = initial_conditions(Pos_measurement[dim], 1/Frequency, initial_samples = initial_samples)
else:
Initial_conditions_dim = Initial_conditions[dim]
if numpy.sum(Final_conditions==None) > 0:
Final_conditions_dim = initial_conditions(Pos_measurement[dim,::-1], 1/Frequency, initial_samples = initial_samples)
else:
Final_conditions_dim = Final_conditions[dim]
Pos_estimate_forw[dim], Vel_estimate_forw[dim] = estimator(Acc_measurement[dim], Pos_measurement[dim], l1[dim], l2[dim], 1/Frequency, Initial_conditions_dim)
Pos_estimate_back[dim], Vel_estimate_back[dim] = estimator(Acc_measurement[dim,::-1], Pos_measurement[dim,::-1], l1[dim], l2[dim], 1/Frequency, Final_conditions_dim)
Pos_estimate = 0.5*(Pos_estimate_forw+Pos_estimate_back[:,::-1])
Vel_estimate = 0.5*(Vel_estimate_forw-Vel_estimate_back[:,::-1])
return Pos_estimate, Vel_estimate
[16]:
Pos_estimate, Vel_estimate = estimator_backandforth(Acc_subsampled, Pos_subsampled, l1, l2, Frequency)
Visualisation of the measurements and estimates
[17]:
def visualise(Acc_subsampled, Pos_subsampled, Pos_estimate, Vel_estimate, Frequency):
import matplotlib.pyplot as plt
plt.rcParams['axes.spines.right'] = False
plt.rcParams['axes.spines.top'] = False
NbOfDimensions, NbOfSamples = numpy.shape(Acc_subsampled)
time = numpy.arange(NbOfSamples)/Frequency
fig, axes = plt.subplots(3,NbOfDimensions)
for dim in range(NbOfDimensions):
axes[0,dim].plot(time, Pos_subsampled[dim], color = 'k', label = 'measurement')
axes[0,dim].plot(time, Pos_estimate[dim], color = 'b', label = 'estimate')
axes[1,dim].plot(time, Vel_estimate[dim], color = 'b')
axes[2,dim].plot(time, Acc_subsampled[dim], color = 'k')
axes[0,0].legend()
labels = ['Position (m)','Velocity (m/s)',r'Acceleration $(m/s^2)$']
for l in range(3):
axes[l,0].set_ylabel(labels[l])
for t in range(NbOfDimensions):
axes[2,t].set_xlabel('Time (s)')
plt.show()
[18]:
visualise(Acc_subsampled, Pos_subsampled, Pos_estimate, Vel_estimate, Frequency)
The function optimal_combination combines all these successive steps
[19]:
def optimal_combination(GroundReactionForce, Force_frequency, Kinematic_com, Kinematic_frequency, mass,
Force_std = 2, Position_std = 0.002, gravity_direction = numpy.array([0,0,-1]), sub_frequency = None, Initial_conditions = None, Final_conditions = None, initial_samples = 10):
'''Combines the Center of Mass position obtained from kinematic measurements with Ground reaction force to estimate the Center of Mass position and velocity
Parameters
----------
GroundReactionForce: (NbOfDimensions,NbOfSamples1) numpy.ndarray
Ground reaction force (in Newton)
Force_frequency: int
Sampling frequency (in Hertz) of the Ground reaction force
Kinematic_com: (NbOfDimensions,NbOfSamples2) numpy.ndarray
Center of Mass position obtained from kinematic measurements (in m)
Kinematic_frequency: int
Sampling frequency (in Hertz) of the kinematics
mass: float
subject's mass (in kg)
Force_std: float or (NbOfDimensions,) numpy.ndarray, optional
Standard deviation of the error in Ground reaction force (in N) (can be provided for each dimension independently), default is 2 N
Position_std: float or (NbOfDimensions,) numpy.ndarray, optional
Standard deviation of the error in CoM position obtained from the kinematics (in m) (can be provided for each dimension independently), default is 0.002 m
gravity_direction: (NbOfDimensions) numpy.ndarray, optional
direction of the gravity vector used to subtract the subject's weight, default is numpy.array([0,0,-1])
sub_frequency: int, optional
Desired sub-sampling frequency (in Hertz), default is None
Initial_conditions: (NbOfDimensions,2) numpy.ndarray, optional
Initial estimates of position (in m) and velocity (in m/s), used when the estimator is applied forwards in time (default is None).
If None, the initial conditions are determined by a least-squares fit on the first few samples.
Final_conditions: (NbOfDimensions,2) numpy.ndarray, optional
Final estimates of position (in m) and velocity (in m/s), used when the estimator is applied backwards in time (default is None).
If None, the final conditions are determined by a least-squares fit on the first few samples.
initial_samples: int, optional
Number of samples used to estimate initial and final position and velocity (default is 10)
Returns
-------
Pos_estimate: (NbOfDimensions, NbOfSamples) numpy.ndarray
Position estimate (in m)
Vel_estimate: (NbOfDimensions, NbOfSamples) numpy.ndarray
Velocity estimate (in m/s)
Frequency: int
Sampling frequency of the position and velocity estimates
'''
# The CoM acceleration is calculated from the Ground reaction force and mass
Acceleration = com_acceleration(GroundReactionForce, mass, gravity_direction = gravity_direction)
# Position and Acceleration are sub-sampled at a common frequency
Acc_subsampled, Pos_subsampled, Frequency = subsample_two_signals(Acceleration, Force_frequency, Kinematic_com, Kinematic_frequency, sub_frequency = sub_frequency)
# The optimal estimator gains at that Frequency are calculated
l1, l2 = estimator_gains(Force_std, Position_std, Frequency, mass)
## The estimator is applied both forwards in time and backwards, and the forwards and backwards estimates are merged.
Pos_estimate, Vel_estimate = estimator_backandforth(Acc_subsampled, Pos_subsampled, l1, l2, Frequency, Initial_conditions = Initial_conditions, Final_conditions = Final_conditions, initial_samples = initial_samples)
return Pos_estimate, Vel_estimate, Frequency
[20]:
Pos_estimate, Vel_estimate, Frequency = optimal_combination(GroundReactionForce, Force_frequency, Kinematic_com, Kinematic_frequency, mass)
[21]:
visualise(Acc_subsampled, Pos_subsampled, Pos_estimate, Vel_estimate, Frequency)