-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator.rs
309 lines (256 loc) · 10.4 KB
/
simulator.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
use crossbeam_channel as cbc;
use std::collections::HashMap;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::time::Duration;
use tracing::{debug, error, info, span, trace, Level};
use malachite_core_consensus::{Error, Input, Params, State, ValuePayload};
use malachite_metrics::Metrics;
use crate::application::Application;
use crate::common;
use crate::context::address::BasePeerAddress;
use crate::context::height::BaseHeight;
use crate::context::peer_set::BasePeerSet;
use crate::context::value::BaseValue;
use crate::context::BaseContext;
use crate::decision::Decision;
/// The delay between each consecutive step the simulator takes.
pub const STEP_DELAY: Duration = Duration::from_millis(200);
/// A stream of [`BaseValue`]s.
/// Each value is treated as a Proposal to consensus.
pub type ProposalsSender = cbc::Sender<BaseValue>;
/// A stream of [`BaseValue`]s as the application receives them.
/// The application treats these values as Proposals.
pub type ProposalsReceiver = cbc::Receiver<BaseValue>;
/// A stream of [`Decision`]s from the point of view of the application,
/// which sends them.
pub type DecisionsSender = Sender<Decision>;
/// A stream of [`Decision`]s.
/// Each decision is a value that consensus has finalized.
pub type DecisionsReceiver = Receiver<Decision>;
/// The sending side of the networking layer.
/// Each message in the network is an [`Envelope`]s.
/// The [`Application`] logic at various peers sent these messages.
pub type NetSender = Sender<Envelope>;
/// The receiving side of the networking layer.
/// The [`Simulator`] takes each message from this queue and applies that
/// message to the appropriate peer.
pub type NetReceiver = Receiver<Envelope>;
/// Represents a message with an [`Input`] to the application logic
/// at a certain peer.
///
/// Peers send envelopes to one another, potentially to themselves in the
/// process of reaching consensus on a decision.
pub struct Envelope {
pub source: BasePeerAddress,
pub destination: BasePeerAddress,
pub payload: Input<BaseContext>,
}
/// A system simulator represents:
///
/// - Some state of peers, namely: params, metrics, and application logic.
/// - The environment for executing the application and producing decisions: the network
/// layer, which is simulated in this case.
///
pub struct Simulator {
// Params of each peer.
params: HashMap<BasePeerAddress, Params<BaseContext>>,
// The metrics of each peer.
metrics: HashMap<BasePeerAddress, Metrics>,
// The application logic associated with each peer.
apps: HashMap<BasePeerAddress, Application>,
// Simulates the receiver-side of the networking layer.
// The sender-side of networking is registered in each application.
network_rx: NetReceiver,
}
impl Simulator {
/// Creates a new system simulator consisting of `size` number of peers.
/// Each peer is a validator in the system.
///
/// Assumes the size of the system is >= 4 and < 25.
pub fn new(
size: u32,
) -> (
Simulator,
Vec<State<BaseContext>>, // The consensus state of peers
ProposalsSender, // Send proposals (inputs to the system)
DecisionsReceiver, // Receive decisions (outputs of the system)
) {
assert!(size >= 4);
assert!(size < 25);
// Construct the simulated network
let (ntx, nrx) = mpsc::channel();
// Crossbeam channel on which `BaseValue` proposals pass from the environment into
// application logic.
// This would be the mempool in a real application.
let (ps, pr) = cbc::bounded(5);
// Channel on which to send/receive the decisions.
let (dtx, drx) = mpsc::channel();
let mut states = vec![];
let mut params = HashMap::new();
let mut apps = HashMap::new();
// Construct the set of peers that comprise the network
let ctx = BaseContext::new();
let val_set = BasePeerSet::new(size, ctx.shared_public_key());
// Construct the consensus states and params for each peer
for i in 0..size {
let peer_addr = BasePeerAddress::new(i);
let p = Params {
initial_height: BaseHeight::default(),
initial_validator_set: val_set.clone(),
address: peer_addr,
// Note: The library provides a type and implementation
// for threshold params which we're re-using.
threshold_params: Default::default(),
// Todo: This can be tricky, must be documented properly
value_payload: ValuePayload::ProposalOnly,
};
// The params for this specific peer
params.insert(peer_addr, p.clone());
// The state at this specific peer
let s = State::new(ctx.clone(), p);
states.push(s);
// Register the application corresponding to this peer
apps.insert(
peer_addr,
Application {
peer_id: peer_addr,
network_tx: ntx.clone(),
decision_tx: dtx.clone(),
proposal_rx: pr.clone(),
},
);
}
(
Simulator {
params,
metrics: HashMap::new(), // Initialize later, at `bootstrap` time
apps,
network_rx: nrx,
},
states,
ps,
drx,
)
}
/// Orchestrate the execution of this system across the network of all peers.
/// Running this will start producing [`Decision`]s.
pub fn run(&mut self, states: &mut [State<BaseContext>]) {
self.initialize_system(states);
// Busy loop to orchestrate among peers
loop {
// Pick the next envelope from the network and demultiplex it
self.step(states);
// Simulate network and execution delays
thread::sleep(STEP_DELAY);
}
}
fn initialize_system(&mut self, states: &mut [State<BaseContext>]) {
let span = span!(Level::INFO, "initialize_system");
let _enter = span.enter();
for peer_state in states.iter_mut() {
let peer_addr = peer_state.params.address;
// Potentially a future refactor: Remove `self.params` and
// use the ones from `states` instead.
let peer_params = self
.params
.get(&peer_addr)
.expect("could not identify peer at next position")
.clone();
// Initialize & save the metrics for later use
let metrics = common::new_metrics();
self.metrics.insert(peer_addr, metrics);
// Tell the application at this peer to initialize itself
let app = self.apps.get(&peer_addr).expect("app not found");
app.init(peer_params.initial_validator_set.clone());
debug!(peer = %peer_addr, "peer init done");
}
info!("done");
}
// Demultiplex among the networking envelopes incoming from `network_rx`, then
// calls the corresponding application logic to handle the `Input`.
// Blocks in case there is no envelope to handle.
fn step(&mut self, states: &mut [State<BaseContext>]) {
let network_env = self.network_rx.recv();
match network_env {
Ok(envelope) => self.step_with_envelope(states, envelope),
Err(err) => {
error!(error = ?err, "error receiving the next envelope from the network");
}
}
}
fn step_with_envelope(&self, states: &mut [State<BaseContext>], envelope: Envelope) {
let peer_addr = envelope.destination;
let peer_state = states.get_mut(peer_addr.0 as usize).unwrap();
let params = self.params.get(&peer_addr).unwrap().clone();
let metrics = self.metrics.get(&peer_addr).unwrap().clone();
let application = self.apps.get(&peer_addr).unwrap();
let context = peer_state.ctx.clone();
trace!(source = %envelope.source, destination = %envelope.destination, "applying an input from an envelope");
self.apply_step_with_envelope(
application,
envelope.payload,
¶ms,
&metrics,
peer_state,
&context,
)
.expect("unknown error during process_peer");
}
fn apply_step_with_envelope(
&self,
application: &Application,
input: Input<BaseContext>,
peer_params: &Params<BaseContext>,
metrics: &Metrics,
peer_state: &mut State<BaseContext>,
context: &BaseContext,
) -> Result<(), Error<BaseContext>> {
application.apply_input(input, peer_params, metrics, peer_state, context)
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc::TryRecvError;
use crate::context::value::BaseValue;
use crate::simulator::Simulator;
#[test]
fn basic_proposal_decisions() {
const PEER_SET_SIZE: u32 = 4;
let (mut n, mut states, proposals, decisions) = Simulator::new(PEER_SET_SIZE);
n.initialize_system(&mut states);
let mut proposal = 45;
let mut peer_count = 0;
for _i in 0..10 {
// Create a value to be proposed
proposals
.send(BaseValue(proposal))
.expect("could not send value to be proposed");
println!("sent proposal {}", proposal);
loop {
// Let the system simulator take another step
n.step(&mut states);
// Check if the system reached a decision
match decisions.try_recv() {
Ok(v) => {
println!("found decision {} from peer {}", v.value_id, v.peer);
let current_decision = v.value_id.0;
assert_eq!(current_decision, proposal);
peer_count += 1;
if peer_count == PEER_SET_SIZE {
// If all peers reached a decision, quit the inner loop
break;
}
}
Err(TryRecvError::Empty) => {
// Keep trying, there was no decision yet
}
Err(_) => panic!("disconnected channel with decisions"),
}
}
proposal += 1;
peer_count = 0;
}
}
}