Skip to content

Commit

Permalink
Merge pull request #1194 from subspace/improve-farmer-ux-cleanup
Browse files Browse the repository at this point in the history
Improve farmer UX by printing useful details, improve logging format and other minor details
  • Loading branch information
nazar-pc authored Feb 26, 2023
2 parents 4ad33e2 + b7f4a82 commit eff34a0
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 81 deletions.
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod farm;
mod info;
mod shared;

pub(crate) use farm::farm_multi_disk;
pub(crate) use info::info;
44 changes: 27 additions & 17 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod dsn;

use crate::commands::farm::dsn::{configure_dsn, start_announcements_processor};
use crate::commands::shared::print_disk_farm_info;
use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal};
use crate::{DiskFarm, FarmingArgs};
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -55,11 +56,12 @@ pub(crate) async fn farm_multi_disk(
disable_farming,
mut dsn,
max_concurrent_plots,
no_info: _,
} = farming_args;

let readers_and_pieces = Arc::new(Mutex::new(None));

info!("Connecting to node RPC at {}", node_rpc_url);
info!(url = %node_rpc_url, "Connecting to node RPC");
let node_client = NodeRpcClient::new(&node_rpc_url).await?;
let concurrent_plotting_semaphore = Arc::new(tokio::sync::Semaphore::new(
farming_args.max_concurrent_plots.get(),
Expand Down Expand Up @@ -127,7 +129,7 @@ pub(crate) async fn farm_multi_disk(

// TODO: Check plot and metadata sizes to ensure there is enough space for farmer to not
// fail later
for disk_farm in disk_farms {
for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {
let minimum_plot_size = get_required_plot_space_with_overhead(PLOT_SECTOR_SIZE);

if disk_farm.allocated_plotting_space < minimum_plot_size {
Expand All @@ -138,22 +140,29 @@ pub(crate) async fn farm_multi_disk(
));
}

info!("Connecting to node RPC at {}", node_rpc_url);
debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
let node_client = NodeRpcClient::new(&node_rpc_url).await?;

let single_disk_plot_fut = SingleDiskPlot::new(SingleDiskPlotOptions {
directory: disk_farm.directory,
allocated_space: disk_farm.allocated_plotting_space,
node_client,
reward_address,
kzg: kzg.clone(),
piece_getter: piece_getter.clone(),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
piece_memory_cache: piece_memory_cache.clone(),
});
let single_disk_plot_fut = SingleDiskPlot::new(
SingleDiskPlotOptions {
directory: disk_farm.directory.clone(),
allocated_space: disk_farm.allocated_plotting_space,
node_client,
reward_address,
kzg: kzg.clone(),
piece_getter: piece_getter.clone(),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
piece_memory_cache: piece_memory_cache.clone(),
},
disk_farm_index,
);

let single_disk_plot = single_disk_plot_fut.await?;

if !farming_args.no_info {
print_disk_farm_info(disk_farm.directory, disk_farm_index);
}

single_disk_plots.push(single_disk_plot);
}

Expand All @@ -163,7 +172,7 @@ pub(crate) async fn farm_multi_disk(
.map(|single_disk_plot| single_disk_plot.piece_reader())
.collect::<Vec<_>>();

debug!("Collecting already plotted pieces");
info!("Collecting already plotted pieces (this will take some time)...");

// Collect already plotted pieces
let plotted_pieces: HashMap<PieceIndexHash, PieceDetails> = single_disk_plots
Expand Down Expand Up @@ -205,7 +214,7 @@ pub(crate) async fn farm_multi_disk(
// We implicitly ignore duplicates here, reading just from one of the plots
.collect();

debug!("Finished collecting already plotted pieces");
info!("Finished collecting already plotted pieces successfully");

readers_and_pieces
.lock()
Expand All @@ -225,9 +234,10 @@ pub(crate) async fn farm_multi_disk(
// Collect newly plotted pieces
// TODO: Once we have replotting, this will have to be updated
single_disk_plot
.on_sector_plotted(Arc::new(move |(plotted_sector, plotting_permit)| {
.on_sector_plotted(Arc::new(move |(sector_offset, plotted_sector, plotting_permit)| {
let plotting_permit = Arc::clone(plotting_permit);
let node = node.clone();
let sector_offset = *sector_offset;
let sector_index = plotted_sector.sector_index;

let mut dropped_receiver = dropped_sender.subscribe();
Expand Down Expand Up @@ -289,7 +299,7 @@ pub(crate) async fn farm_multi_disk(
// Nothing is needed here, just driving all futures to completion
}

info!(?sector_index, "Sector publishing was successful.");
info!(%sector_offset, ?sector_index, "Sector publishing was successful.");

// Release only after publishing is finished
drop(plotting_permit);
Expand Down
44 changes: 30 additions & 14 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub(super) fn configure_dsn(
listen_on,
bootstrap_nodes,
piece_cache_size,
provided_keys_limit,
disable_private_ips,
reserved_peers,
}: DsnArgs,
Expand All @@ -63,33 +64,48 @@ pub(super) fn configure_dsn(
fs::rename(&records_cache_db_path, &piece_cache_db_path)?;
}
}
let provider_cache_db_path = base_path.join("provider_cache_db");
let provider_cache_size =
piece_cache_size.saturating_mul(NonZeroUsize::new(10).expect("10 > 0")); // TODO: add proper value

info!(
?piece_cache_db_path,
?piece_cache_size,
?provider_cache_db_path,
?provider_cache_size,
"Record cache DB configured."
);
let provider_db_path = base_path.join("providers_db");
// TODO: Remove this migration code in the future
{
let provider_cache_db_path = base_path.join("provider_cache_db");
if provider_cache_db_path.exists() {
fs::rename(&provider_cache_db_path, &provider_db_path)?;
}
}

let default_config = Config::default();
let peer_id = peer_id(&keypair);

let db_provider_storage =
ParityDbProviderStorage::new(&provider_cache_db_path, provider_cache_size, peer_id)
info!(
db_path = ?provider_db_path,
keys_limit = ?provided_keys_limit,
"Initializing provider storage..."
);
let persistent_provider_storage =
ParityDbProviderStorage::new(&provider_db_path, provided_keys_limit, peer_id)
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
info!(
current_size = ?persistent_provider_storage.size(),
"Provider storage initialized successfully"
);

info!(
db_path = ?piece_cache_db_path,
size = ?piece_cache_size,
"Initializing piece cache..."
);
let piece_store =
ParityDbStore::new(&piece_cache_db_path).map_err(|err| anyhow::anyhow!(err.to_string()))?;
let piece_cache = FarmerPieceCache::new(piece_store.clone(), piece_cache_size, peer_id);
info!(
current_size = ?piece_cache.size(),
"Piece cache initialized successfully"
);

let farmer_provider_storage = FarmerProviderStorage::new(
peer_id,
readers_and_pieces.clone(),
db_provider_storage,
persistent_provider_storage,
piece_cache.clone(),
);

Expand Down
26 changes: 2 additions & 24 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::commands::shared::print_disk_farm_info;
use crate::DiskFarm;
use subspace_farmer::single_disk_plot::{SingleDiskPlot, SingleDiskPlotSummary};

pub(crate) fn info(disk_farms: Vec<DiskFarm>) {
for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {
Expand All @@ -9,28 +9,6 @@ pub(crate) fn info(disk_farms: Vec<DiskFarm>) {

let DiskFarm { directory, .. } = disk_farm;

println!("Single disk farm {disk_farm_index}:");
match SingleDiskPlot::collect_summary(directory) {
SingleDiskPlotSummary::Found { info, directory } => {
println!(" ID: {}", info.id());
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
println!(" Public key: 0x{}", hex::encode(info.public_key()));
println!(" First sector index: {}", info.first_sector_index());
println!(
" Allocated space: {} ({})",
bytesize::to_string(info.allocated_space(), true),
bytesize::to_string(info.allocated_space(), false)
);
println!(" Directory: {}", directory.display());
}
SingleDiskPlotSummary::NotFound { directory } => {
println!(" Plot directory: {}", directory.display());
println!(" No farm found here yet");
}
SingleDiskPlotSummary::Error { directory, error } => {
println!(" Ddirectory: {}", directory.display());
println!(" Failed to open farm info: {error}");
}
}
print_disk_farm_info(directory, disk_farm_index);
}
}
28 changes: 28 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::path::PathBuf;
use subspace_farmer::single_disk_plot::{SingleDiskPlot, SingleDiskPlotSummary};

pub(crate) fn print_disk_farm_info(directory: PathBuf, disk_farm_index: usize) {
println!("Single disk farm {disk_farm_index}:");
match SingleDiskPlot::collect_summary(directory) {
SingleDiskPlotSummary::Found { info, directory } => {
println!(" ID: {}", info.id());
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
println!(" Public key: 0x{}", hex::encode(info.public_key()));
println!(" First sector index: {}", info.first_sector_index());
println!(
" Allocated space: {} ({})",
bytesize::to_string(info.allocated_space(), true),
bytesize::to_string(info.allocated_space(), false)
);
println!(" Directory: {}", directory.display());
}
SingleDiskPlotSummary::NotFound { directory } => {
println!(" Plot directory: {}", directory.display());
println!(" No farm found here yet");
}
SingleDiskPlotSummary::Error { directory, error } => {
println!(" Directory: {}", directory.display());
println!(" Failed to open farm info: {error}");
}
}
}
9 changes: 7 additions & 2 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use subspace_networking::libp2p::Multiaddr;
use tempfile::TempDir;
use tracing::info;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -56,6 +55,9 @@ struct FarmingArgs {
/// Number of plots that can be plotted concurrently, impacts RAM usage.
#[arg(long, default_value = "10")]
max_concurrent_plots: NonZeroUsize,
/// Do not print info about configured farms on startup.
#[arg(long)]
no_info: bool,
}

/// Arguments for DSN
Expand All @@ -71,6 +73,9 @@ struct DsnArgs {
/// Piece cache size in pieces.
#[arg(long, default_value = "65536")]
piece_cache_size: NonZeroUsize,
/// Number of provided keys (by other peers) that will be stored.
#[arg(long, default_value = "655360")]
provided_keys_limit: NonZeroUsize,
/// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in Kademlia DHT.
#[arg(long, default_value_t = false)]
disable_private_ips: bool,
Expand Down Expand Up @@ -202,7 +207,7 @@ struct Command {
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
fmt::layer().with_span_events(FmtSpan::CLOSE).with_filter(
fmt::layer().with_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
Expand Down
Loading

0 comments on commit eff34a0

Please sign in to comment.