Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sniffer messages cleaner #1341

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions roles/tests-integration/lib/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,53 @@ impl Sniffer {
sleep(Duration::from_secs(1)).await;
}
}

pub async fn wait_for_message_type_with_remove(
&self,
message_direction: MessageDirection,
message_type: u8,
) -> bool {
let now = std::time::Instant::now();
loop {
let has_message_type = match message_direction {
MessageDirection::ToDownstream => self
.messages_from_upstream
.has_message_type_with_remove(message_type),
MessageDirection::ToUpstream => self
.messages_from_downstream
.has_message_type_with_remove(message_type),
};

// ready to unblock test runtime
if has_message_type {
return true;
}

// 10 min timeout
// only for worst case, ideally should never be triggered
if now.elapsed().as_secs() > 10 * 60 {
panic!("Timeout waiting for message type");
}

// sleep to reduce async lock contention
sleep(Duration::from_secs(1)).await;
}
}

pub async fn includes_message_type(
&self,
message_direction: MessageDirection,
message_type: u8,
) -> bool {
match message_direction {
MessageDirection::ToDownstream => {
self.messages_from_upstream.has_message_type(message_type)
}
MessageDirection::ToUpstream => {
self.messages_from_downstream.has_message_type(message_type)
}
}
}
}

// Utility macro to assert that the downstream and upstream roles have sent specific messages.
Expand Down Expand Up @@ -656,6 +703,22 @@ impl MessagesAggregator {
has_message
}

fn has_message_type_with_remove(&self, message_type: u8) -> bool {
self.messages
.safe_lock(|messages| {
let mut cloned_messages = messages.clone();
for (pos, (t, _)) in cloned_messages.iter().enumerate() {
if *t == message_type {
let drained = cloned_messages.drain(pos + 1..).collect();
*messages = drained;
return true;
}
}
false
})
.unwrap()
}

// The aggregator queues messages in FIFO order, so this function returns the oldest message in
// the queue.
//
Expand Down
29 changes: 10 additions & 19 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use integration_tests_sv2::*;

use crate::sniffer::MessageDirection;
use const_sv2::{MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB, MESSAGE_TYPE_NEW_TEMPLATE};
use const_sv2::{
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH, MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
MESSAGE_TYPE_NEW_TEMPLATE,
};
use roles_logic_sv2::{
common_messages_sv2::{Protocol, SetupConnection},
parsers::{AnyMessage, CommonMessages, Mining, PoolMessages, TemplateDistribution},
Expand Down Expand Up @@ -92,24 +95,12 @@ async fn header_timestamp_value_assertion_in_new_extended_mining_job() {
}
_ => panic!("SetNewPrevHash not found!"),
};
// Assertions of messages between Pool and Translator Proxy (these are not necessary for the
// test itself, but they are used to pop from the sniffer's message queue)
assert_common_message!(
&pool_translator_sniffer.next_message_from_upstream(),
SetupConnectionSuccess
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
OpenExtendedMiningChannelSuccess
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
NewExtendedMiningJob
);
assert_mining_message!(
&pool_translator_sniffer.next_message_from_upstream(),
SetNewPrevHash
);
pool_translator_sniffer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@plebhash plebhash Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we take the opportunity on this PR and also apply this new function to the test introduced on #1325 ?

since that was where this feature request was born, seems like a good way to validate the new feature

Copy link
Collaborator

@plebhash plebhash Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh duh nevermind, upon closer inspection I just realized that's exactly what's already being done here

.wait_for_message_type_with_remove(
MessageDirection::ToDownstream,
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH,
)
.await;
// Wait for a second NewExtendedMiningJob message
pool_translator_sniffer
.wait_for_message_type(
Expand Down
39 changes: 37 additions & 2 deletions roles/tests-integration/tests/sniffer_integration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_ERROR;
use const_sv2::{
MESSAGE_TYPE_SETUP_CONNECTION_ERROR, MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
MESSAGE_TYPE_SET_NEW_PREV_HASH,
};
use integration_tests_sv2::*;
use roles_logic_sv2::{
common_messages_sv2::SetupConnectionError,
Expand All @@ -10,7 +13,6 @@ use std::convert::TryInto;
#[tokio::test]
async fn test_sniffer_interrupter() {
let (_tp, tp_addr) = start_template_provider(None).await;
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS;
let message =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
flags: 0,
Expand All @@ -33,3 +35,36 @@ async fn test_sniffer_interrupter() {
assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection);
assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError);
}

#[tokio::test]
async fn test_sniffer_wait_for_message_type_with_remove() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (sniffer, sniffer_addr) = start_sniffer("".to_string(), tp_addr, false, None).await;
let _ = start_pool(Some(sniffer_addr)).await;
assert!(
sniffer
.wait_for_message_type_with_remove(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SET_NEW_PREV_HASH,
)
.await
);
assert_eq!(
sniffer
.includes_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS
)
.await,
false
);
assert_eq!(
sniffer
.includes_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SET_NEW_PREV_HASH
)
.await,
false
);
}
Loading