Skip to content

Commit

Permalink
Fix subscription leakage on the farmer
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed May 18, 2022
1 parent b027b5f commit f2ec4ed
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 84 deletions.
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/archiving.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::object_mappings::ObjectMappings;
use crate::rpc_client::RpcClient;
use futures::StreamExt;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
use subspace_core_primitives::{FlatPieces, Sha256Hash};
Expand Down Expand Up @@ -129,7 +130,7 @@ impl Archiving {
info!("Plotting stopped!");
break;
}
result = archived_segments.recv() => {
result = archived_segments.next() => {
match result {
Some(archived_segment) => {
let segment_index = archived_segment.root_block.segment_index();
Expand Down
29 changes: 19 additions & 10 deletions crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_farmer::{RpcClient, RpcClientError as MockError};
use subspace_rpc_primitives::{
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

/// Client mock for benching purpose
Expand All @@ -28,17 +31,18 @@ impl BenchRpcClient {
metadata: FarmerMetadata,
mut archived_segments_receiver: mpsc::Receiver<ArchivedSegment>,
) -> Self {
let (inner_archived_segments_sender, inner_archived_segments_receiver) = mpsc::channel(10);
let (mut inner_archived_segments_sender, inner_archived_segments_receiver) =
mpsc::channel(10);
let (acknowledge_archived_segment_sender, mut acknowledge_archived_segment_receiver) =
mpsc::channel(1);

let segment_producer_handle = tokio::spawn({
async move {
while let Some(segment) = archived_segments_receiver.recv().await {
while let Some(segment) = archived_segments_receiver.next().await {
if inner_archived_segments_sender.send(segment).await.is_err() {
break;
}
if acknowledge_archived_segment_receiver.recv().await.is_none() {
if acknowledge_archived_segment_receiver.next().await.is_none() {
break;
}
}
Expand Down Expand Up @@ -66,7 +70,9 @@ impl RpcClient for BenchRpcClient {
Ok(self.inner.metadata.clone())
}

async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, MockError> {
unreachable!("Unreachable, as we don't start farming for benchmarking")
}

Expand All @@ -77,7 +83,9 @@ impl RpcClient for BenchRpcClient {
unreachable!("Unreachable, as we don't start farming for benchmarking")
}

async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
async fn subscribe_block_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = BlockSigningInfo> + Send + 'static>>, MockError> {
unreachable!("Unreachable, as we don't start farming for benchmarking")
}

Expand All @@ -90,24 +98,25 @@ impl RpcClient for BenchRpcClient {

async fn subscribe_archived_segments(
&self,
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
let (sender, receiver) = mpsc::channel(10);
) -> Result<Pin<Box<dyn Stream<Item = ArchivedSegment> + Send + 'static>>, MockError> {
let (mut sender, receiver) = mpsc::channel(10);
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
tokio::spawn(async move {
while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await
while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await
{
if sender.send(archived_segment).await.is_err() {
break;
}
}
});

Ok(receiver)
Ok(Box::pin(receiver))
}

async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> {
self.inner
.acknowledge_archived_segment_sender
.clone()
.send(segment_index)
.await?;
Ok(())
Expand Down
19 changes: 9 additions & 10 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::path::{Path, PathBuf};
use std::{fmt, io};

use crate::bench_rpc_client::BenchRpcClient;
use crate::{utils, WriteToDisk};
use anyhow::anyhow;
use futures::channel::mpsc;
use futures::SinkExt;
use rand::prelude::*;
use tempfile::TempDir;
use tracing::info;

use std::path::{Path, PathBuf};
use std::{fmt, io};
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::objects::{PieceObject, PieceObjectMapping};
use subspace_core_primitives::{
Expand All @@ -15,10 +15,9 @@ use subspace_core_primitives::{
use subspace_farmer::multi_farming::{MultiFarming, Options as MultiFarmingOptions};
use subspace_farmer::{ObjectMappings, PieceOffset, Plot, PlotFile, RpcClient};
use subspace_rpc_primitives::FarmerMetadata;
use tempfile::TempDir;
use tokio::time::Instant;

use crate::bench_rpc_client::BenchRpcClient;
use crate::{utils, WriteToDisk};
use tracing::info;

pub struct BenchPlotMock {
piece_count: u64,
Expand Down Expand Up @@ -90,7 +89,7 @@ pub(crate) async fn bench(
) -> anyhow::Result<()> {
utils::raise_fd_limit();

let (archived_segments_sender, archived_segments_receiver) = tokio::sync::mpsc::channel(10);
let (mut archived_segments_sender, archived_segments_receiver) = mpsc::channel(10);
let client = BenchRpcClient::new(BENCH_FARMER_METADATA, archived_segments_receiver);

let base_directory = crate::utils::get_path(custom_path);
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::commitments::Commitments;
use crate::identity::Identity;
use crate::plot::Plot;
use crate::rpc_client::RpcClient;
use futures::{future, future::Either};
use futures::{future, future::Either, StreamExt};
use std::sync::mpsc;
use std::time::Instant;
use subspace_core_primitives::{LocalChallenge, PublicKey, Salt, Solution};
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn subscribe_to_slot_info<T: RpcClient>(

let mut salts = Salts::default();

while let Some(slot_info) = slot_info_notifications.recv().await {
while let Some(slot_info) = slot_info_notifications.next().await {
debug!(?slot_info, "New slot");

update_commitments(plot, commitments, &mut salts, &slot_info);
Expand Down Expand Up @@ -184,7 +184,7 @@ async fn subscribe_to_slot_info<T: RpcClient>(
if let Some(BlockSigningInfo {
header_hash,
public_key,
}) = block_signing_info_notifications.recv().await
}) = block_signing_info_notifications.next().await
{
// Multiple plots might have solved, only sign with correct one
if identity.public_key().to_bytes() != public_key {
Expand Down
71 changes: 49 additions & 22 deletions crates/subspace-farmer/src/mock_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::rpc_client::{Error as MockError, RpcClient};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_rpc_primitives::{
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::Mutex;

/// `MockRpc` wrapper.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -71,23 +74,28 @@ impl MockRpcClient {
}

pub(crate) async fn send_metadata(&self, metadata: FarmerMetadata) {
self.inner.metadata_sender.send(metadata).await.unwrap();
self.inner
.metadata_sender
.clone()
.send(metadata)
.await
.unwrap();
}

pub(crate) async fn send_slot_info(&self, slot_info: SlotInfo) {
self.inner
.slot_into_sender
.lock()
.await
.as_ref()
.as_mut()
.unwrap()
.send(slot_info)
.await
.unwrap();
}

pub(crate) async fn receive_solution(&self) -> Option<SolutionResponse> {
self.inner.solution_receiver.lock().await.recv().await
self.inner.solution_receiver.lock().await.next().await
}

pub(crate) async fn drop_slot_sender(&self) {
Expand All @@ -99,7 +107,7 @@ impl MockRpcClient {
.archived_segments_sender
.lock()
.await
.as_ref()
.as_mut()
.unwrap()
.send(archived_segment)
.await
Expand All @@ -112,7 +120,7 @@ impl MockRpcClient {
acknowledge_archived_segment_receiver
.lock()
.await
.recv()
.next()
.await;
});
}
Expand All @@ -130,19 +138,29 @@ impl MockRpcClient {
#[async_trait]
impl RpcClient for MockRpcClient {
async fn farmer_metadata(&self) -> Result<FarmerMetadata, MockError> {
Ok(self.inner.metadata_receiver.lock().await.try_recv()?)
Ok(self
.inner
.metadata_receiver
.lock()
.await
.try_next()?
.unwrap())
}

async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
let (sender, receiver) = mpsc::channel(10);
async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, MockError> {
let (mut sender, receiver) = mpsc::channel(10);
let slot_receiver = self.inner.slot_info_receiver.clone();
tokio::spawn(async move {
while let Some(slot_info) = slot_receiver.lock().await.recv().await {
sender.send(slot_info).await.unwrap();
while let Some(slot_info) = slot_receiver.lock().await.next().await {
if sender.send(slot_info).await.is_err() {
break;
}
}
});

Ok(receiver)
Ok(Box::pin(receiver))
}

async fn submit_solution_response(
Expand All @@ -151,22 +169,27 @@ impl RpcClient for MockRpcClient {
) -> Result<(), MockError> {
self.inner
.solution_sender
.clone()
.send(solution_response)
.await
.unwrap();
Ok(())
}

async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
let (sender, receiver) = mpsc::channel(10);
async fn subscribe_block_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = BlockSigningInfo> + Send + 'static>>, MockError> {
let (mut sender, receiver) = mpsc::channel(10);
let block_signing_receiver = self.inner.block_signing_info_receiver.clone();
tokio::spawn(async move {
while let Some(block_signing_info) = block_signing_receiver.lock().await.recv().await {
sender.send(block_signing_info).await.unwrap();
while let Some(block_signing_info) = block_signing_receiver.lock().await.next().await {
if sender.send(block_signing_info).await.is_err() {
break;
}
}
});

Ok(receiver)
Ok(Box::pin(receiver))
}

async fn submit_block_signature(
Expand All @@ -175,6 +198,7 @@ impl RpcClient for MockRpcClient {
) -> Result<(), MockError> {
self.inner
.block_signature_sender
.clone()
.send(block_signature)
.await
.unwrap();
Expand All @@ -183,22 +207,25 @@ impl RpcClient for MockRpcClient {

async fn subscribe_archived_segments(
&self,
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
let (sender, receiver) = mpsc::channel(10);
) -> Result<Pin<Box<dyn Stream<Item = ArchivedSegment> + Send + 'static>>, MockError> {
let (mut sender, receiver) = mpsc::channel(10);
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
tokio::spawn(async move {
while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await
while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await
{
sender.send(archived_segment).await.unwrap();
if sender.send(archived_segment).await.is_err() {
break;
}
}
});

Ok(receiver)
Ok(Box::pin(receiver))
}

async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> {
self.inner
.acknowledge_archived_segment_sender
.clone()
.send(segment_index)
.await
.unwrap();
Expand Down
Loading

0 comments on commit f2ec4ed

Please sign in to comment.