Skip to content

Commit

Permalink
fix(manager): store exclusive reference to service data instead of cl…
Browse files Browse the repository at this point in the history
…oning
  • Loading branch information
RolandSherwin committed Mar 21, 2024
1 parent 882c0a2 commit 2fae77f
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 155 deletions.
10 changes: 4 additions & 6 deletions sn_node_manager/src/cmd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> {
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
if let Some(daemon) = node_registry.daemon.clone() {
if let Some(daemon) = &mut node_registry.daemon {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Start Daemon Service ");
println!("=================================================");
}

let service = DaemonService::new(daemon.clone(), Box::new(ServiceController {}));
let service = DaemonService::new(daemon, Box::new(ServiceController {}));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.start().await?;
Expand All @@ -100,7 +100,6 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> {
.map_or("-".to_string(), |e| e.to_string())
);

node_registry.daemon = Some(service_manager.service.service_data);
node_registry.save()?;
return Ok(());
}
Expand All @@ -114,19 +113,18 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> {
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
if let Some(daemon) = node_registry.daemon.clone() {
if let Some(daemon) = &mut node_registry.daemon {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Stop Daemon Service ");
println!("=================================================");
}

let service = DaemonService::new(daemon.clone(), Box::new(ServiceController {}));
let service = DaemonService::new(daemon, Box::new(ServiceController {}));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.stop().await?;

node_registry.daemon = Some(service_manager.service.service_data);
node_registry.save()?;

return Ok(());
Expand Down
15 changes: 6 additions & 9 deletions sn_node_manager/src/cmd/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,21 @@ pub async fn start(verbosity: VerbosityLevel) -> Result<()> {
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
if let Some(faucet) = node_registry.faucet.clone() {
if let Some(faucet) = &mut node_registry.faucet {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Start Faucet Service ");
println!("=================================================");
}

let service = FaucetService::new(faucet.clone(), Box::new(ServiceController {}));
let service = FaucetService::new(faucet, Box::new(ServiceController {}));
let mut service_manager = ServiceManager::new(
service,
Box::new(ServiceController {}),
VerbosityLevel::Normal,
);
service_manager.start().await?;

node_registry.faucet = Some(service_manager.service.service_data);
node_registry.save()?;
return Ok(());
}
Expand All @@ -117,19 +116,18 @@ pub async fn stop(verbosity: VerbosityLevel) -> Result<()> {
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
if let Some(faucet) = node_registry.faucet.clone() {
if let Some(faucet) = &mut node_registry.faucet {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Stop Faucet Service ");
println!("=================================================");
}

let service = FaucetService::new(faucet.clone(), Box::new(ServiceController {}));
let service = FaucetService::new(faucet, Box::new(ServiceController {}));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.stop().await?;

node_registry.faucet = Some(service_manager.service.service_data);
node_registry.save()?;

return Ok(());
Expand Down Expand Up @@ -164,7 +162,7 @@ pub async fn upgrade(

let (upgrade_bin_path, target_version) =
download_and_get_upgrade_bin_path(ReleaseType::Faucet, url, version).await?;
let faucet = node_registry.faucet.clone().unwrap();
let faucet = node_registry.faucet.as_mut().unwrap();

if !force {
let current_version = Version::parse(&faucet.version)?;
Expand All @@ -190,13 +188,12 @@ pub async fn upgrade(
target_bin_path: upgrade_bin_path.clone(),
target_version: target_version.clone(),
};
let service = FaucetService::new(faucet.clone(), Box::new(ServiceController {}));
let service = FaucetService::new(faucet, Box::new(ServiceController {}));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

match service_manager.upgrade(options).await {
Ok(upgrade_result) => {
node_registry.faucet = Some(service_manager.service.service_data);
print_upgrade_summary(vec![("faucet".to_string(), upgrade_result)]);
node_registry.save()?;
Ok(())
Expand Down
72 changes: 36 additions & 36 deletions sn_node_manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ pub async fn remove(
.ok_or_else(|| eyre!("No service named '{name}'"))?;

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.remove(keep_directories).await?;
node_registry.update_node(service_manager.service.service_data)?;
} else if let Some(ref peer_id) = peer_id {
let peer_id = PeerId::from_str(peer_id)?;
let node = node_registry
Expand All @@ -144,11 +143,10 @@ pub async fn remove(
))
})?;
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.remove(keep_directories).await?;
node_registry.update_node(service_manager.service.service_data)?;
}

node_registry.save()?;
Expand Down Expand Up @@ -180,12 +178,11 @@ pub async fn start(
.ok_or_else(|| eyre!("No service named '{name}'"))?;

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.start().await?;

node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
} else if let Some(ref peer_id) = peer_id {
let peer_id = PeerId::from_str(peer_id)?;
Expand All @@ -201,24 +198,22 @@ pub async fn start(
})?;

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.start().await?;

node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
} else {
let mut failed_services = Vec::new();
let node_count = node_registry.nodes.len();
for i in 0..node_count {
let rpc_client = RpcClient::from_socket_addr(node_registry.nodes[i].rpc_socket_addr);
let service = NodeService::new(node_registry.nodes[i].clone(), Box::new(rpc_client));
let service = NodeService::new(&mut node_registry.nodes[i], Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity.clone());
match service_manager.start().await {
Ok(()) => {
node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
}
Err(e) => {
Expand Down Expand Up @@ -305,12 +300,11 @@ pub async fn stop(
.ok_or_else(|| eyre!("No service named '{name}'"))?;

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.stop().await?;

node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
} else if let Some(ref peer_id) = peer_id {
let peer_id = PeerId::from_str(peer_id)?;
Expand All @@ -326,23 +320,21 @@ pub async fn stop(
})?;

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
service_manager.stop().await?;

node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
} else {
let node_count = node_registry.nodes.len();
for i in 0..node_count {
let rpc_client = RpcClient::from_socket_addr(node_registry.nodes[i].rpc_socket_addr);
let service = NodeService::new(node_registry.nodes[i].clone(), Box::new(rpc_client));
let service = NodeService::new(&mut node_registry.nodes[i], Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity.clone());
service_manager.stop().await?;

node_registry.update_node(service_manager.service.service_data)?;
node_registry.save()?;
}
}
Expand Down Expand Up @@ -412,16 +404,22 @@ pub async fn upgrade(
};

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

match service_manager.upgrade(options).await {
Ok(upgrade_result) => {
upgrade_summary.push((service_manager.service.service_data, upgrade_result));
upgrade_summary.push((
service_manager.service.service_data.service_name.clone(),
upgrade_result,
));
}
Err(e) => {
upgrade_summary.push((node.clone(), UpgradeResult::Error(format!("Error: {}", e))));
upgrade_summary.push((
node.service_name.clone(),
UpgradeResult::Error(format!("Error: {}", e)),
));
}
}
} else if let Some(ref peer_id) = peer_id {
Expand Down Expand Up @@ -452,16 +450,22 @@ pub async fn upgrade(
};

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

match service_manager.upgrade(options).await {
Ok(upgrade_result) => {
upgrade_summary.push((service_manager.service.service_data, upgrade_result));
upgrade_summary.push((
service_manager.service.service_data.service_name.clone(),
upgrade_result,
));
}
Err(e) => {
upgrade_summary.push((node.clone(), UpgradeResult::Error(format!("Error: {}", e))));
upgrade_summary.push((
node.service_name.clone(),
UpgradeResult::Error(format!("Error: {}", e)),
));
}
}
} else {
Expand All @@ -481,32 +485,28 @@ pub async fn upgrade(
};

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node.clone(), Box::new(rpc_client));
let service = NodeService::new(node, Box::new(rpc_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity.clone());

match service_manager.upgrade(options).await {
Ok(upgrade_result) => {
upgrade_summary.push((service_manager.service.service_data, upgrade_result));
upgrade_summary.push((
service_manager.service.service_data.service_name.clone(),
upgrade_result,
));
}
Err(e) => {
upgrade_summary
.push((node.clone(), UpgradeResult::Error(format!("Error: {}", e))));
upgrade_summary.push((
node.service_name.clone(),
UpgradeResult::Error(format!("Error: {}", e)),
));
}
}
}
}

print_upgrade_summary(
upgrade_summary
.iter()
.map(|x| (x.0.service_name.clone(), x.1.clone()))
.collect::<Vec<(String, UpgradeResult)>>(),
);

for (node, _) in upgrade_summary {
node_registry.update_node(node.clone())?;
}
print_upgrade_summary(upgrade_summary);

node_registry.save()?;
Ok(())
Expand Down
Loading

0 comments on commit 2fae77f

Please sign in to comment.