Skip to content

Commit

Permalink
A0-4564: Modified example/ordering so that it sends unique elements f…
Browse files Browse the repository at this point in the history
…rom data provider (#495)

* [WIP]

* Fixed bug when cache is not cleared

* fmt

* Review
  • Loading branch information
Marcin-Radecki authored Nov 26, 2024
1 parent dd6e6ce commit 7dda99f
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 124 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/ordering/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft-examples-ordering"
version = "0.0.3"
version = "0.1.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
license = "Apache-2.0"
Expand Down
234 changes: 162 additions & 72 deletions examples/ordering/run.sh
Original file line number Diff line number Diff line change
@@ -1,97 +1,187 @@
#!/bin/bash
#!/usr/bin/env bash

usage() {
echo "Usage: ./run.sh [-n N_NODES] [-m N_MALFUNCTIONING_NODES] [-s N_STALLING_DATA_PROVIDERS] [-c N_CRASHES] [-o N_ORDERED_PER_CRASH] [-d RESTART_DELAY]"
set -eou pipefail

function usage() {
cat << EOF
Usage:
This script is a demonstration usage of AlephBFT protocol, in which there are N nodes and they want to achieve
a consensus with regards to provided data. The data sent to AlephBFT from each node is a stream of integers from range
[0, N * DATA_ITEMS), where DATA_ITEMS is configurable. Each node of index 'i' sends to the consensus
integers from range [i * DATA_ITEMS; (i + 1) * DATA_ITEMS). where 0 <= i < N. At the end, each node makes
sure that it receives all integers from range [0, N * DATA_ITEMS), each integer exactly once.
N nodes are started on your machine, and they communicate via UDP. Not all nodes behave correctly - some of them crash
or are stuck while providing data.
This script is using aleph-bft-examples-ordering and assumes to be available in a relative folder from this script path
../../target/release/aleph-bft-examples-ordering
$0
[-n|--nodes NODES]
number of all non-crashing nodes; some of them can have stalled data provider
[-c|--crashing-nodes CRASHING_NODES]
number of nodes that crash while providing data
[-s|--stalling-data-providers STALLING_DATA_PROVIDERS]
number of nodes that eventually stall while providing data; must be less than --nodes
[--crashes-count CRASHES_COUNT]
how many times a crashing node should crash
[--data-items DATA_ITEMS]
how many data items each node should order
[--crash-restart-delay-seconds CRASH_RESTART_DELAY_SECONDS]
delay (seconds) between subsequent node crashes
[--unit-creation-delay UNIT_CREATION_DELAY]
unit creation delay (milliseconds), default 200
EOF
exit 0
}

NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)

function get_timestamp() {
echo "$(date +'%Y-%m-%d %T:%3N')"
}

function error() {
echo -e "$(get_timestamp) $RED$*$NORMAL"
exit 1
}

N_NODES=2
N_MALFUNCTIONING_NODES=2
N_STALLING_DATA_PROVIDERS=1
N_CRASHES=3
N_ORDERED_PER_CRASH=25
RESTART_DELAY=1

while getopts :n:m:s:c:o:d: flag; do
case "${flag}" in
n) N_NODES=${OPTARG};;
m) N_MALFUNCTIONING_NODES=${OPTARG};;
s) N_STALLING_DATA_PROVIDERS=${OPTARG};;
c) N_CRASHES=${OPTARG};;
o) N_ORDERED_PER_CRASH=${OPTARG};;
d) RESTART_DELAY=${OPTARG};;
*) usage;;
esac
done
function info() {
echo -e "$(get_timestamp) $GREEN$*$NORMAL"
}

n_ordered=$(( (N_CRASHES+1)*N_ORDERED_PER_CRASH ))
stalled=$(seq -s, 0 $((N_STALLING_DATA_PROVIDERS-1)))
port=10000
ports="$port"
for i in $(seq 0 $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 2)); do
port=$((port+1))
ports+=",$port"
done
function warning() {
echo -e "$(get_timestamp) $YELLOW$*$NORMAL"
}

set -e
function run_ordering_binary() {
local id="$1"
local starting_data_item="$2"
local data_items=$3
local should_stall="${4:-no}"

cargo build --release
binary="../../target/release/aleph-bft-examples-ordering"
local binary_args=(
--id "$id"
--ports "${PORTS}"
--starting-data-item "${starting_data_item}"
--data-items "${data_items}"
--required-finalization-value "${EXPECTED_FINALIZED_DATA_ITEMS}"
--unit-creation-delay "${UNIT_CREATION_DELAY}"
)
if [[ "${should_stall}" == "yes-stall" ]]; then
binary_args+=(--should-stall)
fi

clear
info "Starting node ${id} to provide items from ${starting_data_item} to $(( starting_data_item + data_items - 1 )), inclusive"
"${ordering_binary}" "${binary_args[@]}" 2>> "node${id}.log" > /dev/null &
}

run_crash_node () {
function run_crash_node () {
id="$1"
n_starting=0
n_data=$N_ORDERED_PER_CRASH
for (( i = 1; i <= N_CRASHES; i++ )); do
echo "Starting node $id at $n_starting items ($i/$((N_CRASHES+1)))..."
! "$binary" --id "$id" --ports "$ports" --n-data "$n_data" --n-starting "$n_starting" --stalled "$stalled" --crash 2>> "node${id}.log"
echo "Node $id crashed. Respawning in $RESTART_DELAY seconds..."
sleep "$RESTART_DELAY"
n_starting=$n_data
n_data=$(( n_data + N_ORDERED_PER_CRASH ))
for run_attempt_index in $(seq 0 $(( CRASHES_COUNT - 1 ))); do
run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}"
pid=$!
info "Waiting ${CRASH_RESTART_DELAY_SECONDS} seconds..."
sleep "${CRASH_RESTART_DELAY_SECONDS}"
info "Killing node with pid ${pid}"
kill -9 "${pid}" 2> /dev/null
done
echo "Starting node $id at $n_starting items ($((N_CRASHES+1))/$((N_CRASHES+1)))..."
"$binary" --id "$id" --ports "$ports" --n-data "$n_data" --n-starting "$n_starting" --stalled "$stalled" 2>> "node${id}.log"
run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}"
}

for id in $(seq 0 $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 1)); do
rm -f "aleph-bft-examples-ordering-backup/${id}.units"
rm -f "node${id}.log"
NODES=2
CRASHING_NODES=2
STALLING_DATA_PROVIDERS=1
CRASHES_COUNT=3
DATA_ITEMS=25
CRASH_RESTART_DELAY_SECONDS=5
DATA_ITEMS_COUNTER=0
UNIT_CREATION_DELAY=200

while [[ $# -gt 0 ]]; do
case "$1" in
-n|--nodes)
NODES="$2"
shift;shift
;;
-c|--crashing-nodes)
CRASHING_NODES="$2"
shift;shift
;;
-s|--stalling-data-providers)
STALLING_DATA_PROVIDERS="$2"
shift;shift
;;
--crashes-count)
CRASHES_COUNT="$2"
shift;shift
;;
--data-items)
DATA_ITEMS="$2"
shift;shift
;;
--crash-restart-delay-seconds)
CRASH_RESTART_DELAY_SECONDS="$2"
shift;shift
;;
--unit-creation-delay)
UNIT_CREATION_DELAY="$2"
shift;shift
;;
--help)
usage
shift
;;
*)
error "Unrecognized argument $1!"
;;
esac
done

script_path="${BASH_SOURCE[0]}"
script_dir=$(dirname "${script_path}")
ordering_binary_dir=$(realpath "${script_dir}/../../")
ordering_binary="${ordering_binary_dir}/target/release/aleph-bft-examples-ordering"

echo "WARNING
The current implementation of AlephBFT does not strictly guarantee
all input data to be included in the output - a property that will
be added in a future version. This issue occurs when the provider lags
behind other nodes.
Therefore, always check logs to see if there are any unexpected
errors - e.g. connection timeout - or if some crashed nodes are lagging
behind - messages \"Providing None\" are logged, but the total
amount of finalized data does not increase for this particular node.
In such case, try reducing the number of nodes or shortening
the restart delay. Another option is to make more than 1/3 of the nodes
malfunctioning - with that the protocol will stall while the nodes are
restarting so that there won't be a set of nodes that run out ahead.
if [[ ! -x "${ordering_binary}" ]]; then
error "${ordering_binary} does not exist or it's not an executable file!"
fi

ALL_NODES=$(( NODES + CRASHING_NODES ))
PORTS=($(seq -s , 10000 $(( 10000 + ALL_NODES - 1 ))))
EXPECTED_FINALIZED_DATA_ITEMS=$(( ALL_NODES * DATA_ITEMS ))

for id in $(seq 0 $(( ALL_NODES - 1 ))); do
rm -f "aleph-bft-examples-ordering-backup/${id}.units"
rm -f "node${id}.log"
done

info "Starting $0
PARAMETERS
number of nodes: $N_NODES
number of malfunctioning nodes: $N_MALFUNCTIONING_NODES
number of nodes with stalling DataProviders: $N_STALLING_DATA_PROVIDERS
number of forced crashes: $N_CRASHES
number of ordered data per crash: $N_ORDERED_PER_CRASH
restart delay: $RESTART_DELAY second(s)
number of nodes: ${NODES}
number of crashing nodes: ${CRASHING_NODES}
number of nodes with stalling DataProviders: ${STALLING_DATA_PROVIDERS}
number of forced crashes: ${CRASHES_COUNT}
number of ordered data per batch: ${DATA_ITEMS}
restart delay: ${CRASH_RESTART_DELAY_SECONDS} second(s)
"

for id in $(seq 0 $(expr $N_NODES - 1)); do
echo "Starting node ${id}..."
"$binary" --id "$id" --ports "$ports" --n-data "$n_ordered" --stalled "$stalled" 2>> "node${id}.log" &
for id in $(seq 0 $(( NODES - 1 ))); do
if [[ "${id}" -lt "${STALLING_DATA_PROVIDERS}" ]]; then
run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}" "yes-stall"
else
run_ordering_binary "${id}" "${DATA_ITEMS_COUNTER}" "${DATA_ITEMS}"
fi
DATA_ITEMS_COUNTER=$(( DATA_ITEMS_COUNTER + DATA_ITEMS ))
done

for i in $(seq $(expr $N_NODES) $(expr $N_NODES + $N_MALFUNCTIONING_NODES - 1)); do
run_crash_node "$i" &
for id in $(seq $(( NODES )) $(( ALL_NODES - 1 ))); do
run_crash_node "${id}" &
DATA_ITEMS_COUNTER=$(( DATA_ITEMS_COUNTER + DATA_ITEMS ))
done

trap 'kill $(jobs -p); wait' SIGINT SIGTERM
Expand Down
21 changes: 11 additions & 10 deletions examples/ordering/src/dataio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ pub type Data = (NodeIndex, u32);
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Default, Decode, Encode)]
pub struct DataProvider {
id: NodeIndex,
counter: u32,
n_data: u32,
starting_data_item: u32,
data_items: u32,
current_data: u32,
stalled: bool,
}

impl DataProvider {
pub fn new(id: NodeIndex, counter: u32, n_data: u32, stalled: bool) -> Self {
pub fn new(id: NodeIndex, starting_data_item: u32, data_items: u32, stalled: bool) -> Self {
Self {
id,
counter,
n_data,
starting_data_item,
current_data: starting_data_item,
data_items,
stalled,
}
}
Expand All @@ -35,18 +37,17 @@ impl DataProviderT for DataProvider {
type Output = Data;

async fn get_data(&mut self) -> Option<Data> {
if self.n_data == 0 {
if self.starting_data_item + self.data_items == self.current_data {
if self.stalled {
info!("Awaiting DataProvider::get_data forever");
pending::<()>().await;
}
info!("Providing None");
None
} else {
let data = (self.id, self.counter);
info!("Providing data: {}", self.counter);
self.counter += 1;
self.n_data -= 1;
let data = (self.id, self.current_data);
info!("Providing data: {}", self.current_data);
self.current_data += 1;
Some(data)
}
}
Expand Down
Loading

0 comments on commit 7dda99f

Please sign in to comment.