Skip to content

Commit

Permalink
Algorithm support for periodic sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Dec 13, 2024
1 parent a611ad2 commit 2df1ed3
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 90 deletions.
2 changes: 2 additions & 0 deletions ntp-proto/src/algorithm/kalman/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::from_seconds(source_uncertainty),
source_delay: NtpDuration::from_seconds(0.01),
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down Expand Up @@ -227,6 +228,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::from_seconds(0.0),
source_delay: NtpDuration::from_seconds(0.0),
leap_indicator: leap,
Expand Down
23 changes: 17 additions & 6 deletions ntp-proto/src/algorithm/kalman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct SourceSnapshot<Index: Copy> {
state: KalmanState,
wander: f64,
delay: f64,
period: Option<f64>,

source_uncertainty: NtpDuration,
source_delay: NtpDuration,
Expand Down Expand Up @@ -113,7 +114,10 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
}
for (_, (state, _)) in self.sources.iter_mut() {
if let Some(ref mut snapshot) = state {
snapshot.state = snapshot.state.progress_time(time, snapshot.wander)
snapshot.state =
snapshot
.state
.progress_time(time, snapshot.wander, snapshot.period)
}
}

Expand Down Expand Up @@ -262,7 +266,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
.expect("Cannot adjust clock");
for (state, _) in self.sources.values_mut() {
if let Some(ref mut state) = state {
state.state = state.state.process_offset_steering(change);
state.state = state.state.process_offset_steering(change, state.period);
}
}
info!("Jumped offset by {}ms", change * 1e3);
Expand Down Expand Up @@ -315,10 +319,12 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
.expect("Cannot adjust clock");
for (state, _) in self.sources.values_mut() {
if let Some(ref mut state) = state {
state.state =
state
.state
.process_frequency_steering(freq_update, actual_change, state.wander)
state.state = state.state.process_frequency_steering(
freq_update,
actual_change,
state.wander,
state.period,
)
}
}
debug!(
Expand Down Expand Up @@ -382,6 +388,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
KalmanSourceController::new(
id,
self.algo_config,
None,
self.source_defaults_config,
AveragingBuffer::default(),
)
Expand All @@ -391,11 +398,13 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
&mut self,
id: SourceId,
measurement_noise_estimate: f64,
period: Option<f64>,
) -> Self::OneWaySourceController {
self.sources.insert(id, (None, false));
KalmanSourceController::new(
id,
self.algo_config,
period,
self.source_defaults_config,
measurement_noise_estimate,
)
Expand Down Expand Up @@ -638,6 +647,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::ZERO,
source_delay: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down Expand Up @@ -686,6 +696,7 @@ mod tests {
},
wander: 0.0,
delay: 0.0,
period: None,
source_uncertainty: NtpDuration::ZERO,
source_delay: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down
40 changes: 30 additions & 10 deletions ntp-proto/src/algorithm/kalman/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub(super) fn select<Index: Copy>(
let mut bounds: Vec<(f64, BoundType)> = Vec::with_capacity(2 * candidates.len());

for snapshot in candidates.iter() {
if snapshot.period.is_some() {
// Do not let periodic sources be part of the vote for correct time
continue;
}

let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight
+ snapshot.delay * algo_config.range_delay_weight;
if radius > algo_config.maximum_source_uncertainty
Expand All @@ -38,30 +43,44 @@ pub(super) fn select<Index: Copy>(

bounds.sort_by(|a, b| a.0.total_cmp(&b.0));

let mut max: usize = 0;
let mut maxt: f64 = 0.0;
let mut maxlow: usize = 0;
let mut maxhigh: usize = 0;
let mut maxtlow: f64 = 0.0;
let mut maxthigh: f64 = 0.0;
let mut cur: usize = 0;

for (time, boundtype) in bounds.iter() {
match boundtype {
BoundType::Start => cur += 1,
BoundType::End => cur -= 1,
}
if cur > max {
max = cur;
maxt = *time;
BoundType::Start => {
cur += 1;
if cur > maxlow {
maxlow = cur;
maxtlow = *time;
}
}
BoundType::End => {
if cur > maxhigh {
maxhigh = cur;
maxthigh = *time;
}
cur -= 1;
}
}
}

// Catch programming errors. If this ever fails there is high risk of missteering, better fail hard in that case
assert_eq!(maxlow, maxhigh);
let max = maxlow;

if max >= synchronization_config.minimum_agreeing_sources && max * 4 > bounds.len() {
candidates
.iter()
.filter(|snapshot| {
let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight
+ snapshot.delay * algo_config.range_delay_weight;
radius <= algo_config.maximum_source_uncertainty
&& snapshot.offset() - radius <= maxt
&& snapshot.offset() + radius >= maxt
&& snapshot.offset() - radius <= maxthigh
&& snapshot.offset() + radius >= maxtlow
&& snapshot.leap_indicator.is_synchronized()
})
.cloned()
Expand Down Expand Up @@ -96,6 +115,7 @@ mod tests {
},
wander: 0.0,
delay,
period: None,
source_uncertainty: NtpDuration::from_seconds(0.01),
source_delay: NtpDuration::from_seconds(0.01),
leap_indicator: NtpLeapIndicator::NoWarning,
Expand Down
Loading

0 comments on commit 2df1ed3

Please sign in to comment.